Merge pull request #6952 from influxdata/dom/namespace-soft-delete-catalog

feat: namespace soft-delete support
pull/24376/head
kodiakhq[bot] 2023-02-13 12:21:24 +00:00 committed by GitHub
commit 2277cd80f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 769 additions and 302 deletions

View File

@ -6,7 +6,7 @@ use data_types::{
ColumnType, ColumnTypeCount, Namespace, NamespaceId, PartitionId, PartitionKey, PartitionParam,
ShardId, Table, TableId, TableSchema, Timestamp,
};
use iox_catalog::interface::{get_schema_by_id, Catalog};
use iox_catalog::interface::{get_schema_by_id, Catalog, SoftDeletedRows};
use iox_query::exec::Executor;
use iox_time::TimeProvider;
use metric::{
@ -419,11 +419,11 @@ impl Compactor {
for id in namespace_ids {
let namespace = repos
.namespaces()
.get_by_id(id)
.get_by_id(id, SoftDeletedRows::AllRows)
.await
.context(QueryingNamespaceSnafu)?
.context(NamespaceNotFoundSnafu { namespace_id: id })?;
let schema = get_schema_by_id(namespace.id, repos.as_mut())
let schema = get_schema_by_id(namespace.id, repos.as_mut(), SoftDeletedRows::AllRows)
.await
.context(QueryingNamespaceSnafu)?;
namespaces.insert(id, (Arc::new(namespace), schema));

View File

@ -3,7 +3,7 @@ use std::{fmt::Display, sync::Arc};
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{Namespace, NamespaceId, NamespaceSchema};
use iox_catalog::interface::{get_schema_by_id, Catalog};
use iox_catalog::interface::{get_schema_by_id, Catalog, SoftDeletedRows};
use super::NamespacesSource;
@ -37,7 +37,7 @@ impl NamespacesSource for CatalogNamespacesSource {
.repositories()
.await
.namespaces()
.get_by_id(ns)
.get_by_id(ns, SoftDeletedRows::AllRows)
.await
})
.await
@ -48,7 +48,7 @@ impl NamespacesSource for CatalogNamespacesSource {
Backoff::new(&self.backoff_config)
.retry_all_errors("namespace_of_given_namespace_id", || async {
let mut repos = self.catalog.repositories().await;
let res = get_schema_by_id(ns, repos.as_mut()).await;
let res = get_schema_by_id(ns, repos.as_mut(), SoftDeletedRows::AllRows).await;
match res {
Ok(schema) => Ok(Some(schema)),
Err(iox_catalog::interface::Error::NamespaceNotFoundById { .. }) => Ok(None),

View File

@ -6,7 +6,9 @@ use data_types::{
Partition, PartitionKey, QueryPoolId, ShardId, TableSchema, TopicId,
};
use influxdb_iox_client::connection::{Connection, GrpcConnection};
use iox_catalog::interface::{get_schema_by_name, CasFailure, Catalog, RepoCollection};
use iox_catalog::interface::{
get_schema_by_name, CasFailure, Catalog, RepoCollection, SoftDeletedRows,
};
use schema::{
sort::{adjust_sort_key_columns, SortKey, SortKeyBuilder},
InfluxColumnType, InfluxFieldType, TIME_COLUMN_NAME,
@ -65,7 +67,13 @@ pub async fn update_iox_catalog<'a>(
org_and_bucket_to_namespace(&merged_tsm_schema.org_id, &merged_tsm_schema.bucket_id)
.map_err(UpdateCatalogError::InvalidOrgBucket)?;
let mut repos = catalog.repositories().await;
let iox_schema = match get_schema_by_name(namespace_name.as_str(), repos.deref_mut()).await {
let iox_schema = match get_schema_by_name(
namespace_name.as_str(),
repos.deref_mut(),
SoftDeletedRows::AllRows,
)
.await
{
Ok(iox_schema) => iox_schema,
Err(iox_catalog::interface::Error::NamespaceNotFoundByName { .. }) => {
// create the namespace
@ -80,7 +88,13 @@ pub async fn update_iox_catalog<'a>(
.await?;
// fetch the newly-created schema (which will be empty except for the time column,
// which won't impact the merge we're about to do)
match get_schema_by_name(namespace_name.as_str(), repos.deref_mut()).await {
match get_schema_by_name(
namespace_name.as_str(),
repos.deref_mut(),
SoftDeletedRows::AllRows,
)
.await
{
Ok(iox_schema) => iox_schema,
Err(e) => return Err(UpdateCatalogError::CatalogError(e)),
}
@ -149,7 +163,7 @@ where
// presumably it got created in the meantime?
repos
.namespaces()
.get_by_name(name)
.get_by_name(name, SoftDeletedRows::ExcludeDeleted)
.await
.map_err(UpdateCatalogError::CatalogError)?
.ok_or_else(|| UpdateCatalogError::NamespaceNotFound(name.to_string()))
@ -534,7 +548,11 @@ mod tests {
.await
.expect("schema update worked");
let mut repos = catalog.repositories().await;
let iox_schema = get_schema_by_name("1234_5678", repos.deref_mut())
let iox_schema = get_schema_by_name(
"1234_5678",
repos.deref_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await
.expect("got schema");
assert_eq!(iox_schema.tables.len(), 1);
@ -653,7 +671,11 @@ mod tests {
.await
.expect("schema update worked");
let mut repos = catalog.repositories().await;
let iox_schema = get_schema_by_name("1234_5678", repos.deref_mut())
let iox_schema = get_schema_by_name(
"1234_5678",
repos.deref_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await
.expect("got schema");
assert_eq!(iox_schema.tables.len(), 1);

View File

@ -18,7 +18,7 @@ use influxdb_iox_client::{
schema::{self, generated_types::NamespaceSchema},
store,
};
use iox_catalog::interface::{get_schema_by_name, Catalog};
use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows};
use parquet_file::ParquetFilePath;
use std::sync::Arc;
use thiserror::Error;
@ -254,7 +254,7 @@ async fn load_schema(
Ok(n) => n,
Err(iox_catalog::interface::Error::NameExists { .. }) => repos
.namespaces()
.get_by_name(namespace)
.get_by_name(namespace, SoftDeletedRows::ExcludeDeleted)
.await?
.ok_or(Error::NamespaceNotFound)?,
e => e?,
@ -285,7 +285,12 @@ async fn load_schema(
println!("table {table_name} with columns {column_names} loaded into local catalog");
}
let full_inserted_schema = get_schema_by_name(&namespace.name, repos.as_mut()).await?;
let full_inserted_schema = get_schema_by_name(
&namespace.name,
repos.as_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await?;
Ok(full_inserted_schema)
}

View File

@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration};
use backoff::{Backoff, BackoffConfig};
use data_types::NamespaceId;
use iox_catalog::interface::Catalog;
use iox_catalog::interface::{Catalog, SoftDeletedRows};
use super::NamespaceName;
use crate::deferred_load::DeferredLoad;
@ -46,7 +46,10 @@ impl NamespaceNameResolver {
.repositories()
.await
.namespaces()
.get_by_id(namespace_id)
// Accept soft-deleted namespaces to enable any writes for
// it to flow through the system. Eventually the routers
// will prevent new writes to the deleted namespace.
.get_by_id(namespace_id, SoftDeletedRows::AllRows)
.await?
.expect("resolving namespace name for non-existent namespace id")
.name

View File

@ -12,7 +12,11 @@ use ingester::{
lifecycle::LifecycleConfig,
querier_handler::IngesterQueryResponse,
};
use iox_catalog::{interface::Catalog, mem::MemCatalog, validate_or_insert_schema};
use iox_catalog::{
interface::{Catalog, SoftDeletedRows},
mem::MemCatalog,
validate_or_insert_schema,
};
use iox_query::exec::Executor;
use iox_time::TimeProvider;
use metric::{Attributes, Metric, MetricObserver};
@ -224,7 +228,7 @@ impl TestContext {
.repositories()
.await
.namespaces()
.get_by_name(namespace)
.get_by_name(namespace, SoftDeletedRows::AllRows)
.await
.expect("should be able to get namespace by name")
.expect("namespace does not exist")
@ -304,7 +308,7 @@ impl TestContext {
.repositories()
.await
.namespaces()
.get_by_name(namespace)
.get_by_name(namespace, SoftDeletedRows::AllRows)
.await
.expect("should be able to get namespace by name")
.expect("namespace does not exist")

View File

@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration};
use backoff::{Backoff, BackoffConfig};
use data_types::NamespaceId;
use iox_catalog::interface::Catalog;
use iox_catalog::interface::{Catalog, SoftDeletedRows};
use super::NamespaceName;
use crate::deferred_load::DeferredLoad;
@ -46,7 +46,10 @@ impl NamespaceNameResolver {
.repositories()
.await
.namespaces()
.get_by_id(namespace_id)
// Accept soft-deleted namespaces to enable any writes for
// it to flow through the system. Eventually the routers
// will prevent new writes to the deleted namespace.
.get_by_id(namespace_id, SoftDeletedRows::AllRows)
.await?
.unwrap_or_else(|| {
panic!(

View File

@ -13,11 +13,11 @@ mod tests {
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use data_types::{CompactionLevel, ParquetFile, PartitionKey, ShardId, SequenceNumber};
use data_types::{CompactionLevel, ParquetFile, PartitionKey, SequenceNumber, ShardId};
use dml::DmlOperation;
use futures::TryStreamExt;
use iox_catalog::{
interface::{get_schema_by_id, Catalog},
interface::{get_schema_by_id, Catalog, SoftDeletedRows},
mem::MemCatalog,
validate_or_insert_schema,
};
@ -95,7 +95,7 @@ mod tests {
.with_timeout_panic(Duration::from_secs(1))
.await;
let schema = get_schema_by_id(namespace_id, &mut *repos)
let schema = get_schema_by_id(namespace_id, &mut *repos, SoftDeletedRows::AllRows)
.await
.expect("failed to find namespace schema");

View File

@ -5,7 +5,7 @@ use crate::{
use generated_types::influxdata::iox::ingester::v1::{
self as proto, persist_service_server::PersistService,
};
use iox_catalog::interface::Catalog;
use iox_catalog::interface::{Catalog, SoftDeletedRows};
use std::sync::Arc;
use tonic::{Request, Response};
@ -50,7 +50,7 @@ where
.repositories()
.await
.namespaces()
.get_by_name(&request.namespace)
.get_by_name(&request.namespace, SoftDeletedRows::AllRows)
.await
.map_err(|e| tonic::Status::internal(e.to_string()))?
.ok_or_else(|| tonic::Status::not_found(&request.namespace))?;

View File

@ -149,6 +149,53 @@ pub enum Error {
/// A specialized `Error` for Catalog errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Specify how soft-deleted entities should affect query results.
///
/// ```text
///
/// ExcludeDeleted OnlyDeleted
///
/// ┃ ┃
/// .─────╋─────. .─────╋─────.
/// ,─' ┃ '─. ,─' ┃ '─.
/// ,' ● `,' ● `.
/// ,' ,' `. `.
/// ; ; : :
/// │ No deleted │ │ Only deleted │
/// │ rows │ ● │ rows │
/// : : ┃ ; ;
/// ╲ ╲ ┃
/// `. `┃' ,'
/// `. ,'┃`. ,'
/// '─. ,─' ┃ '─. ,─'
/// `─────────' ┃ `─────────'
/// ┃
///
/// AllRows
///
/// ```
#[derive(Debug, Clone, Copy)]
pub enum SoftDeletedRows {
/// Return all rows.
AllRows,
/// Return all rows, except soft deleted rows.
ExcludeDeleted,
/// Return only soft deleted rows.
OnlyDeleted,
}
impl SoftDeletedRows {
pub(crate) fn as_sql_predicate(&self) -> &str {
match self {
Self::ExcludeDeleted => "deleted_at IS NULL",
Self::OnlyDeleted => "deleted_at IS NOT NULL",
Self::AllRows => "1=1",
}
}
}
/// Methods for working with the catalog.
#[async_trait]
pub trait Catalog: Send + Sync + Debug {
@ -319,16 +366,24 @@ pub trait NamespaceRepo: Send + Sync {
) -> Result<Namespace>;
/// List all namespaces.
async fn list(&mut self) -> Result<Vec<Namespace>>;
async fn list(&mut self, deleted: SoftDeletedRows) -> Result<Vec<Namespace>>;
/// Gets the namespace by its ID.
async fn get_by_id(&mut self, id: NamespaceId) -> Result<Option<Namespace>>;
async fn get_by_id(
&mut self,
id: NamespaceId,
deleted: SoftDeletedRows,
) -> Result<Option<Namespace>>;
/// Gets the namespace by its unique name.
async fn get_by_name(&mut self, name: &str) -> Result<Option<Namespace>>;
async fn get_by_name(
&mut self,
name: &str,
deleted: SoftDeletedRows,
) -> Result<Option<Namespace>>;
/// Delete a namespace by name
async fn delete(&mut self, name: &str) -> Result<()>;
/// Soft-delete a namespace by name
async fn soft_delete(&mut self, name: &str) -> Result<()>;
/// Update the limit on the number of tables that can exist per namespace.
async fn update_table_limit(&mut self, name: &str, new_max: i32) -> Result<Namespace>;
@ -755,13 +810,17 @@ pub trait ProcessedTombstoneRepo: Send + Sync {
}
/// Gets the namespace schema including all tables and columns.
pub async fn get_schema_by_id<R>(id: NamespaceId, repos: &mut R) -> Result<NamespaceSchema>
pub async fn get_schema_by_id<R>(
id: NamespaceId,
repos: &mut R,
deleted: SoftDeletedRows,
) -> Result<NamespaceSchema>
where
R: RepoCollection + ?Sized,
{
let namespace = repos
.namespaces()
.get_by_id(id)
.get_by_id(id, deleted)
.await?
.context(NamespaceNotFoundByIdSnafu { id })?;
@ -769,13 +828,17 @@ where
}
/// Gets the namespace schema including all tables and columns.
pub async fn get_schema_by_name<R>(name: &str, repos: &mut R) -> Result<NamespaceSchema>
pub async fn get_schema_by_name<R>(
name: &str,
repos: &mut R,
deleted: SoftDeletedRows,
) -> Result<NamespaceSchema>
where
R: RepoCollection + ?Sized,
{
let namespace = repos
.namespaces()
.get_by_name(name)
.get_by_name(name, deleted)
.await?
.context(NamespaceNotFoundByNameSnafu { name })?;
@ -849,6 +912,10 @@ where
/// result set. No table lock is obtained, nor are queries executed within a
/// transaction, but this method does return a point-in-time snapshot of the
/// catalog state.
///
/// # Soft Deletion
///
/// No schemas for soft-deleted namespaces are returned.
pub async fn list_schemas(
catalog: &dyn Catalog,
) -> Result<impl Iterator<Item = (Namespace, NamespaceSchema)>> {
@ -862,6 +929,9 @@ pub async fn list_schemas(
// queries resolving only what is needed to construct schemas for the
// retrieved columns (ignoring any newly added tables/namespaces since the
// column snapshot was taken).
//
// This approach also tolerates concurrently deleted namespaces, which are
// simply ignored at the end when joining to the namespace query result.
// First fetch all the columns - this is the state snapshot of the catalog
// schemas.
@ -894,7 +964,12 @@ pub async fn list_schemas(
// Do all the I/O to fetch the namespaces in the background, while this
// thread constructs the NamespaceId->TableSchema map below.
let namespaces = tokio::spawn(async move { repos.namespaces().list().await });
let namespaces = tokio::spawn(async move {
repos
.namespaces()
.list(SoftDeletedRows::ExcludeDeleted)
.await
});
// A set of tables within a single namespace.
type NamespaceTables = BTreeMap<String, TableSchema>;
@ -928,6 +1003,9 @@ pub async fn list_schemas(
// was created, or have no tables/columns (and therefore have no entry
// in "joined").
.filter_map(move |v| {
// The catalog call explicitly asked for no soft deleted records.
assert!(v.deleted_at.is_none());
let mut ns = NamespaceSchema::new(
v.id,
v.topic_id,
@ -971,6 +1049,7 @@ pub(crate) mod test_helpers {
F: Future<Output = Arc<dyn Catalog>> + Send,
{
test_setup(clean_state().await).await;
test_namespace_soft_deletion(clean_state().await).await;
test_partitions_with_recent_created_files(clean_state().await).await;
test_most_cold_files_partitions(clean_state().await).await;
test_query_pool(clean_state().await).await;
@ -990,6 +1069,7 @@ pub(crate) mod test_helpers {
test_txn_isolation(clean_state().await).await;
test_txn_drop(clean_state().await).await;
test_list_schemas(clean_state().await).await;
test_list_schemas_soft_deleted_rows(clean_state().await).await;
test_delete_namespace(clean_state().await).await;
let catalog = clean_state().await;
@ -1107,7 +1187,7 @@ pub(crate) mod test_helpers {
let found = repos
.namespaces()
.get_by_id(namespace.id)
.get_by_id(namespace.id, SoftDeletedRows::ExcludeDeleted)
.await
.unwrap()
.expect("namespace should be there");
@ -1115,14 +1195,14 @@ pub(crate) mod test_helpers {
let not_found = repos
.namespaces()
.get_by_id(NamespaceId::new(i64::MAX))
.get_by_id(NamespaceId::new(i64::MAX), SoftDeletedRows::ExcludeDeleted)
.await
.unwrap();
assert!(not_found.is_none());
let found = repos
.namespaces()
.get_by_name(namespace_name)
.get_by_name(namespace_name, SoftDeletedRows::ExcludeDeleted)
.await
.unwrap()
.expect("namespace should be there");
@ -1130,7 +1210,7 @@ pub(crate) mod test_helpers {
let not_found = repos
.namespaces()
.get_by_name("does_not_exist")
.get_by_name("does_not_exist", SoftDeletedRows::ExcludeDeleted)
.await
.unwrap();
assert!(not_found.is_none());
@ -1141,7 +1221,11 @@ pub(crate) mod test_helpers {
.create(namespace2_name, None, topic.id, pool.id)
.await
.unwrap();
let mut namespaces = repos.namespaces().list().await.unwrap();
let mut namespaces = repos
.namespaces()
.list(SoftDeletedRows::ExcludeDeleted)
.await
.unwrap();
namespaces.sort_by_key(|ns| ns.name.clone());
assert_eq!(namespaces, vec![namespace, namespace2]);
@ -1214,26 +1298,199 @@ pub(crate) mod test_helpers {
// remove namespace to avoid it from affecting later tests
repos
.namespaces()
.delete("test_namespace")
.soft_delete("test_namespace")
.await
.expect("delete namespace should succeed");
repos
.namespaces()
.delete("test_namespace2")
.soft_delete("test_namespace2")
.await
.expect("delete namespace should succeed");
repos
.namespaces()
.delete("test_namespace3")
.soft_delete("test_namespace3")
.await
.expect("delete namespace should succeed");
repos
.namespaces()
.delete("test_namespace4")
.soft_delete("test_namespace4")
.await
.expect("delete namespace should succeed");
}
/// Construct a set of two namespaces:
///
/// * deleted-ns: marked as soft-deleted
/// * active-ns: not marked as deleted
///
/// And assert the expected "soft delete" semantics / correctly filter out
/// the expected rows for all three states of [`SoftDeletedRows`].
async fn test_namespace_soft_deletion(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let deleted_ns = repos
.namespaces()
.create("deleted-ns", None, topic.id, pool.id)
.await
.unwrap();
let active_ns = repos
.namespaces()
.create("active-ns", None, topic.id, pool.id)
.await
.unwrap();
// Mark "deleted-ns" as soft-deleted.
repos.namespaces().soft_delete("deleted-ns").await.unwrap();
// Which should be idempotent (ignoring the timestamp change - when
// changing this to "soft delete" it was idempotent, so I am preserving
// that).
repos.namespaces().soft_delete("deleted-ns").await.unwrap();
// Listing should respect soft deletion.
let got = repos
.namespaces()
.list(SoftDeletedRows::AllRows)
.await
.unwrap()
.into_iter()
.map(|v| v.name);
assert_string_set_eq(got, ["deleted-ns", "active-ns"]);
let got = repos
.namespaces()
.list(SoftDeletedRows::OnlyDeleted)
.await
.unwrap()
.into_iter()
.map(|v| v.name);
assert_string_set_eq(got, ["deleted-ns"]);
let got = repos
.namespaces()
.list(SoftDeletedRows::ExcludeDeleted)
.await
.unwrap()
.into_iter()
.map(|v| v.name);
assert_string_set_eq(got, ["active-ns"]);
// As should get by ID
let got = repos
.namespaces()
.get_by_id(deleted_ns.id, SoftDeletedRows::AllRows)
.await
.unwrap()
.into_iter()
.map(|v| v.name);
assert_string_set_eq(got, ["deleted-ns"]);
let got = repos
.namespaces()
.get_by_id(deleted_ns.id, SoftDeletedRows::OnlyDeleted)
.await
.unwrap()
.into_iter()
.map(|v| {
assert!(v.deleted_at.is_some());
v.name
});
assert_string_set_eq(got, ["deleted-ns"]);
let got = repos
.namespaces()
.get_by_id(deleted_ns.id, SoftDeletedRows::ExcludeDeleted)
.await
.unwrap();
assert!(got.is_none());
let got = repos
.namespaces()
.get_by_id(active_ns.id, SoftDeletedRows::AllRows)
.await
.unwrap()
.into_iter()
.map(|v| v.name);
assert_string_set_eq(got, ["active-ns"]);
let got = repos
.namespaces()
.get_by_id(active_ns.id, SoftDeletedRows::OnlyDeleted)
.await
.unwrap();
assert!(got.is_none());
let got = repos
.namespaces()
.get_by_id(active_ns.id, SoftDeletedRows::ExcludeDeleted)
.await
.unwrap()
.into_iter()
.map(|v| v.name);
assert_string_set_eq(got, ["active-ns"]);
// And get by name
let got = repos
.namespaces()
.get_by_name(&deleted_ns.name, SoftDeletedRows::AllRows)
.await
.unwrap()
.into_iter()
.map(|v| v.name);
assert_string_set_eq(got, ["deleted-ns"]);
let got = repos
.namespaces()
.get_by_name(&deleted_ns.name, SoftDeletedRows::OnlyDeleted)
.await
.unwrap()
.into_iter()
.map(|v| {
assert!(v.deleted_at.is_some());
v.name
});
assert_string_set_eq(got, ["deleted-ns"]);
let got = repos
.namespaces()
.get_by_name(&deleted_ns.name, SoftDeletedRows::ExcludeDeleted)
.await
.unwrap();
assert!(got.is_none());
let got = repos
.namespaces()
.get_by_name(&active_ns.name, SoftDeletedRows::AllRows)
.await
.unwrap()
.into_iter()
.map(|v| v.name);
assert_string_set_eq(got, ["active-ns"]);
let got = repos
.namespaces()
.get_by_name(&active_ns.name, SoftDeletedRows::OnlyDeleted)
.await
.unwrap();
assert!(got.is_none());
let got = repos
.namespaces()
.get_by_name(&active_ns.name, SoftDeletedRows::ExcludeDeleted)
.await
.unwrap()
.into_iter()
.map(|v| v.name);
assert_string_set_eq(got, ["active-ns"]);
}
// Assert the set of strings "a" is equal to the set "b", tolerating
// duplicates.
#[track_caller]
fn assert_string_set_eq<T, U>(a: impl IntoIterator<Item = T>, b: impl IntoIterator<Item = U>)
where
T: Into<String>,
U: Into<String>,
{
let mut a = a.into_iter().map(Into::into).collect::<Vec<String>>();
a.sort_unstable();
let mut b = b.into_iter().map(Into::into).collect::<Vec<String>>();
b.sort_unstable();
assert_eq!(a, b);
}
async fn test_table(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
@ -1363,12 +1620,12 @@ pub(crate) mod test_helpers {
repos
.namespaces()
.delete("namespace_table_test")
.soft_delete("namespace_table_test")
.await
.expect("delete namespace should succeed");
repos
.namespaces()
.delete("two")
.soft_delete("two")
.await
.expect("delete namespace should succeed");
}
@ -1522,7 +1779,7 @@ pub(crate) mod test_helpers {
repos
.namespaces()
.delete("namespace_column_test")
.soft_delete("namespace_column_test")
.await
.expect("delete namespace should succeed");
}
@ -1950,12 +2207,12 @@ pub(crate) mod test_helpers {
repos
.namespaces()
.delete("namespace_partition_test2")
.soft_delete("namespace_partition_test2")
.await
.expect("delete namespace should succeed");
repos
.namespaces()
.delete("namespace_partition_test")
.soft_delete("namespace_partition_test")
.await
.expect("delete namespace should succeed");
}
@ -2124,12 +2381,12 @@ pub(crate) mod test_helpers {
repos
.namespaces()
.delete("namespace_tombstone_test2")
.soft_delete("namespace_tombstone_test2")
.await
.expect("delete namespace should succeed");
repos
.namespaces()
.delete("namespace_tombstone_test")
.soft_delete("namespace_tombstone_test")
.await
.expect("delete namespace should succeed");
}
@ -2355,7 +2612,7 @@ pub(crate) mod test_helpers {
repos
.namespaces()
.delete("namespace_tombstones_by_parquet_file_test")
.soft_delete("namespace_tombstones_by_parquet_file_test")
.await
.expect("delete namespace should succeed");
}
@ -2470,7 +2727,7 @@ pub(crate) mod test_helpers {
(catalog.time_provider().now() + Duration::from_secs(100)).timestamp_nanos(),
);
let deleted_files = repos.parquet_files().delete_old(older_than).await.unwrap();
assert!(deleted_files.is_empty());
assert_matches!(deleted_files.as_slice(), []);
assert!(repos.parquet_files().exist(parquet_file.id).await.unwrap());
// verify to_delete can be updated to a timestamp
@ -2793,7 +3050,11 @@ pub(crate) mod test_helpers {
// test retention-based flagging for deletion
// Since mem catalog has default retention 1 hour, let us first set it to 0 means infinite
let namespaces = repos.namespaces().list().await.expect("listing namespaces");
let namespaces = repos
.namespaces()
.list(SoftDeletedRows::AllRows)
.await
.expect("listing namespaces");
for namespace in namespaces {
repos
.namespaces()
@ -3086,7 +3347,7 @@ pub(crate) mod test_helpers {
// drop the namespace to avoid the created data in this tests from affecting other tests
repos
.namespaces()
.delete("namespace_parquet_file_compaction_level_0_test")
.soft_delete("namespace_parquet_file_compaction_level_0_test")
.await
.expect("delete namespace should succeed");
}
@ -3320,7 +3581,7 @@ pub(crate) mod test_helpers {
// drop the namespace to avoid the created data in this tests from affecting other tests
repos
.namespaces()
.delete("namespace_parquet_file_compaction_level_1_test")
.soft_delete("namespace_parquet_file_compaction_level_1_test")
.await
.expect("delete namespace should succeed");
}
@ -3962,7 +4223,7 @@ pub(crate) mod test_helpers {
// drop the namespace to avoid the created data in this tests from affecting other tests
repos
.namespaces()
.delete("test_most_level_0_files_partitions")
.soft_delete("test_most_level_0_files_partitions")
.await
.expect("delete namespace should succeed");
@ -4341,7 +4602,7 @@ pub(crate) mod test_helpers {
// drop the namespace to avoid the created data in this tests from affecting other tests
repos
.namespaces()
.delete("test_partitions_with_recent_created_files")
.soft_delete("test_partitions_with_recent_created_files")
.await
.expect("delete namespace should succeed");
}
@ -5017,7 +5278,7 @@ pub(crate) mod test_helpers {
// remove namespace to avoid it from affecting later tests
repos
.namespaces()
.delete("test_partitions_with_small_l1_file_count")
.soft_delete("test_partitions_with_small_l1_file_count")
.await
.expect("delete namespace should succeed");
}
@ -5146,7 +5407,7 @@ pub(crate) mod test_helpers {
// remove namespace to avoid it from affecting later tests
repos
.namespaces()
.delete("namespace_parquet_file_test_list_by_partiton_not_to_delete")
.soft_delete("namespace_parquet_file_test_list_by_partiton_not_to_delete")
.await
.expect("delete namespace should succeed");
}
@ -5279,7 +5540,7 @@ pub(crate) mod test_helpers {
// remove namespace to avoid it from affecting later tests
repos
.namespaces()
.delete("namespace_update_to_compaction_level_1_test")
.soft_delete("namespace_update_to_compaction_level_1_test")
.await
.expect("delete namespace should succeed");
}
@ -5465,11 +5726,24 @@ pub(crate) mod test_helpers {
// remove namespace to avoid it from affecting later tests
repos
.namespaces()
.delete("namespace_processed_tombstone_test")
.soft_delete("namespace_processed_tombstone_test")
.await
.expect("delete namespace should succeed");
}
/// Assert that a namespace deletion does NOT cascade to the tables/schema
/// items/parquet files/etc.
///
/// Removal of this entities breaks the invariant that once created, a row
/// always exists for the lifetime of an IOx process, and causes the system
/// to panic in multiple components. It's also ineffective, because most
/// components maintain a cache of at least one of these entities.
///
/// Instead soft deleted namespaces should have their files GC'd like a
/// normal parquet file deletion, removing the rows once they're no longer
/// being actively used by the system. This is done by waiting a long time
/// before deleting records, and whilst isn't perfect, it is largely
/// effective.
async fn test_delete_namespace(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("foo").await.unwrap();
@ -5743,26 +6017,47 @@ pub(crate) mod test_helpers {
.unwrap();
assert!(exist);
// now delete namespace_1 and assert it's all gone and none of namespace_2 is gone
// now delete namespace_1 and assert it's all gone and none of
// namespace_2 is gone
repos
.namespaces()
.delete("namespace_test_delete_namespace_1")
.soft_delete("namespace_test_delete_namespace_1")
.await
.expect("delete namespace should succeed");
// assert that namespace, table, column, tombstones, parquet files and processed tombstones
// are all gone
// assert that namespace is soft-deleted, but the table, column,
// tombstones, parquet files and processed tombstones are all still
// there.
assert!(repos
.namespaces()
.get_by_id(namespace_1.id)
.get_by_id(namespace_1.id, SoftDeletedRows::ExcludeDeleted)
.await
.expect("get namespace should succeed")
.is_none());
assert!(repos
assert_eq!(
repos
.namespaces()
.get_by_id(namespace_1.id, SoftDeletedRows::AllRows)
.await
.expect("get namespace should succeed")
.map(|mut v| {
// The only change after soft-deletion should be the deleted_at
// field being set - this block normalises that field, so that
// the before/after can be asserted as equal.
v.deleted_at = None;
v
})
.expect("should see soft-deleted row"),
namespace_1
);
assert_eq!(
repos
.tables()
.get_by_id(table_1.id)
.await
.expect("get table should succeed")
.is_none());
.expect("should return row"),
table_1
);
assert_eq!(
repos
.columns()
@ -5770,7 +6065,7 @@ pub(crate) mod test_helpers {
.await
.expect("listing columns should succeed")
.len(),
0
1
);
assert_eq!(
repos
@ -5779,7 +6074,7 @@ pub(crate) mod test_helpers {
.await
.expect("listing columns should succeed")
.len(),
0
1
);
assert_eq!(
repos
@ -5788,7 +6083,7 @@ pub(crate) mod test_helpers {
.await
.expect("listing tombstones should succeed")
.len(),
0
3
);
assert_eq!(
repos
@ -5797,45 +6092,45 @@ pub(crate) mod test_helpers {
.await
.expect("listing tombstones should succeed")
.len(),
0
3
);
assert!(repos
.partitions()
.get_by_id(partition_1.id)
.await
.expect("fetching partition by id should succeed")
.is_none());
assert!(!repos
.is_some());
assert!(repos
.parquet_files()
.exist(p1_n1.id)
.await
.expect("parquet file exists check should succeed"));
assert!(!repos
assert!(repos
.parquet_files()
.exist(p2_n1.id)
.await
.expect("parquet file exists check should succeed"));
assert!(!repos
assert!(repos
.processed_tombstones()
.exist(p1_n1.id, t2_n1.id)
.await
.expect("processed tombstone exists check should succeed"));
assert!(!repos
assert!(repos
.processed_tombstones()
.exist(p1_n1.id, t3_n1.id)
.await
.expect("processed tombstone exists check should succeed"));
assert!(!repos
assert!(repos
.processed_tombstones()
.exist(p2_n1.id, t3_n1.id)
.await
.expect("processed tombstone exists check should succeed"));
// assert that the namespace, table, column, tombstone, parquet files and processed
// tombstones for namespace_2 are still there
// assert that the namespace, table, column, tombstone, parquet files
// and processed tombstones for namespace_2 are still there
assert!(repos
.namespaces()
.get_by_id(namespace_2.id)
.get_by_id(namespace_2.id, SoftDeletedRows::ExcludeDeleted)
.await
.expect("get namespace should succeed")
.is_some());
@ -5994,7 +6289,7 @@ pub(crate) mod test_helpers {
Ok(v) => v,
Err(Error::NameExists { .. }) => repos
.namespaces()
.get_by_name(namespace_name)
.get_by_name(namespace_name, SoftDeletedRows::AllRows)
.await
.unwrap()
.unwrap(),
@ -6048,6 +6343,40 @@ pub(crate) mod test_helpers {
assert!(got.contains(&ns2), "{:#?}\n\nwant{:#?}", got, &ns2);
}
async fn test_list_schemas_soft_deleted_rows(catalog: Arc<dyn Catalog>) {
let mut repos = catalog.repositories().await;
let ns1 = populate_namespace(
repos.deref_mut(),
"ns1",
"cpu,tag=1 field=1i\nanother,tag=1 field=1.0",
)
.await;
let ns2 = populate_namespace(
repos.deref_mut(),
"ns2",
"cpu,tag=1 field=1i\nsomethingelse field=1u",
)
.await;
repos
.namespaces()
.soft_delete(&ns2.0.name)
.await
.expect("failed to soft delete namespace");
// Otherwise the in-mem catalog deadlocks.... (but not postgres)
drop(repos);
let got = list_schemas(&*catalog)
.await
.expect("should be able to list the schemas")
.collect::<Vec<_>>();
assert!(got.contains(&ns1), "{:#?}\n\nwant{:#?}", got, &ns1);
assert!(!got.contains(&ns2), "{:#?}\n\n do not want{:#?}", got, &ns2);
}
fn assert_metric_hit(metrics: &metric::Registry, name: &'static str) {
let histogram = metrics
.get_instrument::<Metric<DurationHistogram>>("catalog_op_duration")

View File

@ -239,7 +239,7 @@ mod tests {
use std::sync::Arc;
use super::*;
use crate::interface::get_schema_by_name;
use crate::interface::{get_schema_by_name, SoftDeletedRows};
use crate::mem::MemCatalog;
// Generate a test that simulates multiple, sequential writes in `lp` and
@ -317,7 +317,7 @@ mod tests {
// Invariant: in absence of concurrency, the schema within
// the database must always match the incrementally built
// cached schema.
let db_schema = get_schema_by_name(NAMESPACE_NAME, txn.deref_mut())
let db_schema = get_schema_by_name(NAMESPACE_NAME, txn.deref_mut(), SoftDeletedRows::ExcludeDeleted)
.await
.expect("database failed to query for namespace schema");
assert_eq!(schema, db_schema, "schema in DB and cached schema differ");

View File

@ -5,8 +5,8 @@ use crate::{
interface::{
sealed::TransactionFinalize, CasFailure, Catalog, ColumnRepo, ColumnTypeMismatchSnafu,
Error, NamespaceRepo, ParquetFileRepo, PartitionRepo, ProcessedTombstoneRepo,
QueryPoolRepo, RepoCollection, Result, ShardRepo, TableRepo, TombstoneRepo,
TopicMetadataRepo, Transaction,
QueryPoolRepo, RepoCollection, Result, ShardRepo, SoftDeletedRows, TableRepo,
TombstoneRepo, TopicMetadataRepo, Transaction,
},
metrics::MetricDecorator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES,
@ -311,77 +311,54 @@ impl NamespaceRepo for MemTxn {
Ok(stage.namespaces.last().unwrap().clone())
}
async fn list(&mut self) -> Result<Vec<Namespace>> {
async fn list(&mut self, deleted: SoftDeletedRows) -> Result<Vec<Namespace>> {
let stage = self.stage();
Ok(stage.namespaces.clone())
Ok(filter_namespace_soft_delete(&stage.namespaces, deleted)
.cloned()
.collect())
}
async fn get_by_id(&mut self, id: NamespaceId) -> Result<Option<Namespace>> {
async fn get_by_id(
&mut self,
id: NamespaceId,
deleted: SoftDeletedRows,
) -> Result<Option<Namespace>> {
let stage = self.stage();
Ok(stage.namespaces.iter().find(|n| n.id == id).cloned())
Ok(filter_namespace_soft_delete(&stage.namespaces, deleted)
.find(|n| n.id == id)
.cloned())
}
async fn get_by_name(&mut self, name: &str) -> Result<Option<Namespace>> {
async fn get_by_name(
&mut self,
name: &str,
deleted: SoftDeletedRows,
) -> Result<Option<Namespace>> {
let stage = self.stage();
Ok(stage.namespaces.iter().find(|n| n.name == name).cloned())
Ok(filter_namespace_soft_delete(&stage.namespaces, deleted)
.find(|n| n.name == name)
.cloned())
}
// performs a cascading delete of all things attached to the namespace, then deletes the
// namespace
async fn delete(&mut self, name: &str) -> Result<()> {
async fn soft_delete(&mut self, name: &str) -> Result<()> {
let timestamp = self.time_provider.now();
let stage = self.stage();
// get namespace by name
let namespace_id = match stage.namespaces.iter().find(|n| n.name == name) {
Some(n) => n.id,
None => {
return Err(Error::NamespaceNotFoundByName {
name: name.to_string(),
})
}
};
// get list of parquet files that match the namespace id
let parquet_file_ids: Vec<_> = stage
.parquet_files
.iter()
.filter_map(|f| (f.namespace_id == namespace_id).then_some(f.id))
.collect();
// delete all processed tombstones for those parquet files
stage
.processed_tombstones
.retain(|pt| !parquet_file_ids.iter().any(|id| *id == pt.parquet_file_id));
// delete all the parquet files
stage
.parquet_files
.retain(|pf| !parquet_file_ids.iter().any(|id| *id == pf.id));
// get tables with that namespace id
let table_ids: HashSet<_> = stage
.tables
.iter()
.filter_map(|table| (table.namespace_id == namespace_id).then_some(table.id))
.collect();
// delete partitions for those tables
stage
.partitions
.retain(|p| !table_ids.iter().any(|id| *id == p.table_id));
// delete tombstones for those tables
stage
.tombstones
.retain(|t| !table_ids.iter().any(|id| *id == t.table_id));
// delete columns for those tables
stage
.columns
.retain(|c| !table_ids.iter().any(|id| *id == c.table_id));
// delete those tables
stage
.tables
.retain(|t| !table_ids.iter().any(|id| *id == t.id));
// finally, delete the namespace
stage.namespaces.retain(|n| n.id != namespace_id);
match stage.namespaces.iter_mut().find(|n| n.name == name) {
Some(n) => {
n.deleted_at = Some(Timestamp::from(timestamp));
Ok(())
}
None => Err(Error::NamespaceNotFoundByName {
name: name.to_string(),
}),
}
}
async fn update_table_limit(&mut self, name: &str, new_max: i32) -> Result<Namespace> {
let stage = self.stage();
@ -1788,6 +1765,17 @@ impl ProcessedTombstoneRepo for MemTxn {
}
}
fn filter_namespace_soft_delete<'a>(
v: impl IntoIterator<Item = &'a Namespace>,
deleted: SoftDeletedRows,
) -> impl Iterator<Item = &'a Namespace> {
v.into_iter().filter(move |v| match deleted {
SoftDeletedRows::AllRows => true,
SoftDeletedRows::ExcludeDeleted => v.deleted_at.is_none(),
SoftDeletedRows::OnlyDeleted => v.deleted_at.is_some(),
})
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -3,7 +3,7 @@
use crate::interface::{
sealed::TransactionFinalize, CasFailure, ColumnRepo, NamespaceRepo, ParquetFileRepo,
PartitionRepo, ProcessedTombstoneRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo,
TableRepo, TombstoneRepo, TopicMetadataRepo,
SoftDeletedRows, TableRepo, TombstoneRepo, TopicMetadataRepo,
};
use async_trait::async_trait;
use data_types::{
@ -195,10 +195,10 @@ decorate!(
methods = [
"namespace_create" = create(&mut self, name: &str, retention_period_ns: Option<i64>, topic_id: TopicId, query_pool_id: QueryPoolId) -> Result<Namespace>;
"namespace_update_retention_period" = update_retention_period(&mut self, name: &str, retention_period_ns: Option<i64>) -> Result<Namespace>;
"namespace_list" = list(&mut self) -> Result<Vec<Namespace>>;
"namespace_get_by_id" = get_by_id(&mut self, id: NamespaceId) -> Result<Option<Namespace>>;
"namespace_get_by_name" = get_by_name(&mut self, name: &str) -> Result<Option<Namespace>>;
"namespace_delete" = delete(&mut self, name: &str) -> Result<()>;
"namespace_list" = list(&mut self, deleted: SoftDeletedRows) -> Result<Vec<Namespace>>;
"namespace_get_by_id" = get_by_id(&mut self, id: NamespaceId, deleted: SoftDeletedRows) -> Result<Option<Namespace>>;
"namespace_get_by_name" = get_by_name(&mut self, name: &str, deleted: SoftDeletedRows) -> Result<Option<Namespace>>;
"namespace_soft_delete" = soft_delete(&mut self, name: &str) -> Result<()>;
"namespace_update_table_limit" = update_table_limit(&mut self, name: &str, new_max: i32) -> Result<Namespace>;
"namespace_update_column_limit" = update_column_limit(&mut self, name: &str, new_max: i32) -> Result<Namespace>;
]

View File

@ -4,8 +4,8 @@ use crate::{
interface::{
self, sealed::TransactionFinalize, CasFailure, Catalog, ColumnRepo,
ColumnTypeMismatchSnafu, Error, NamespaceRepo, ParquetFileRepo, PartitionRepo,
ProcessedTombstoneRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, TableRepo,
TombstoneRepo, TopicMetadataRepo, Transaction,
ProcessedTombstoneRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, SoftDeletedRows,
TableRepo, TombstoneRepo, TopicMetadataRepo, Transaction,
},
metrics::MetricDecorator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME,
@ -662,12 +662,13 @@ impl NamespaceRepo for PostgresTxn {
Ok(rec)
}
async fn list(&mut self) -> Result<Vec<Namespace>> {
async fn list(&mut self, deleted: SoftDeletedRows) -> Result<Vec<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>(
r#"
SELECT *
FROM namespace;
"#,
format!(
r#"SELECT * FROM namespace WHERE {v};"#,
v = deleted.as_sql_predicate()
)
.as_str(),
)
.fetch_all(&mut self.inner)
.await
@ -676,13 +677,17 @@ FROM namespace;
Ok(rec)
}
async fn get_by_id(&mut self, id: NamespaceId) -> Result<Option<Namespace>> {
async fn get_by_id(
&mut self,
id: NamespaceId,
deleted: SoftDeletedRows,
) -> Result<Option<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>(
r#"
SELECT *
FROM namespace
WHERE id = $1;
"#,
format!(
r#"SELECT * FROM namespace WHERE id=$1 AND {v};"#,
v = deleted.as_sql_predicate()
)
.as_str(),
)
.bind(id) // $1
.fetch_one(&mut self.inner)
@ -697,13 +702,17 @@ WHERE id = $1;
Ok(Some(namespace))
}
async fn get_by_name(&mut self, name: &str) -> Result<Option<Namespace>> {
async fn get_by_name(
&mut self,
name: &str,
deleted: SoftDeletedRows,
) -> Result<Option<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>(
r#"
SELECT *
FROM namespace
WHERE name = $1;
"#,
format!(
r#"SELECT * FROM namespace WHERE name=$1 AND {v};"#,
v = deleted.as_sql_predicate()
)
.as_str(),
)
.bind(name) // $1
.fetch_one(&mut self.inner)
@ -718,15 +727,13 @@ WHERE name = $1;
Ok(Some(namespace))
}
async fn delete(&mut self, name: &str) -> Result<()> {
async fn soft_delete(&mut self, name: &str) -> Result<()> {
let flagged_at = Timestamp::from(self.time_provider.now());
// note that there is a uniqueness constraint on the name column in the DB
sqlx::query(
r#"
DELETE FROM namespace
WHERE name = $1;
"#,
)
.bind(name)
sqlx::query(r#"UPDATE namespace SET deleted_at=$1 WHERE name = $2;"#)
.bind(flagged_at) // $1
.bind(name) // $2
.execute(&mut self.inner)
.await
.context(interface::CouldNotDeleteNamespaceSnafu)

View File

@ -4,8 +4,8 @@ use crate::{
interface::{
self, sealed::TransactionFinalize, CasFailure, Catalog, ColumnRepo,
ColumnTypeMismatchSnafu, Error, NamespaceRepo, ParquetFileRepo, PartitionRepo,
ProcessedTombstoneRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, TableRepo,
TombstoneRepo, TopicMetadataRepo, Transaction,
ProcessedTombstoneRepo, QueryPoolRepo, RepoCollection, Result, ShardRepo, SoftDeletedRows,
TableRepo, TombstoneRepo, TopicMetadataRepo, Transaction,
},
metrics::MetricDecorator,
DEFAULT_MAX_COLUMNS_PER_TABLE, DEFAULT_MAX_TABLES, SHARED_TOPIC_ID, SHARED_TOPIC_NAME,
@ -441,12 +441,13 @@ impl NamespaceRepo for SqliteTxn {
Ok(rec)
}
async fn list(&mut self) -> Result<Vec<Namespace>> {
async fn list(&mut self, deleted: SoftDeletedRows) -> Result<Vec<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>(
r#"
SELECT *
FROM namespace;
"#,
format!(
r#"SELECT * FROM namespace WHERE {v};"#,
v = deleted.as_sql_predicate()
)
.as_str(),
)
.fetch_all(self.inner.get_mut())
.await
@ -455,13 +456,17 @@ FROM namespace;
Ok(rec)
}
async fn get_by_id(&mut self, id: NamespaceId) -> Result<Option<Namespace>> {
async fn get_by_id(
&mut self,
id: NamespaceId,
deleted: SoftDeletedRows,
) -> Result<Option<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>(
r#"
SELECT *
FROM namespace
WHERE id = $1;
"#,
format!(
r#"SELECT * FROM namespace WHERE id=$1 AND {v};"#,
v = deleted.as_sql_predicate()
)
.as_str(),
)
.bind(id) // $1
.fetch_one(self.inner.get_mut())
@ -476,13 +481,17 @@ WHERE id = $1;
Ok(Some(namespace))
}
async fn get_by_name(&mut self, name: &str) -> Result<Option<Namespace>> {
async fn get_by_name(
&mut self,
name: &str,
deleted: SoftDeletedRows,
) -> Result<Option<Namespace>> {
let rec = sqlx::query_as::<_, Namespace>(
r#"
SELECT *
FROM namespace
WHERE name = $1;
"#,
format!(
r#"SELECT * FROM namespace WHERE name=$1 AND {v};"#,
v = deleted.as_sql_predicate()
)
.as_str(),
)
.bind(name) // $1
.fetch_one(self.inner.get_mut())
@ -497,15 +506,13 @@ WHERE name = $1;
Ok(Some(namespace))
}
async fn delete(&mut self, name: &str) -> Result<()> {
async fn soft_delete(&mut self, name: &str) -> Result<()> {
let flagged_at = Timestamp::from(self.time_provider.now());
// note that there is a uniqueness constraint on the name column in the DB
sqlx::query(
r#"
DELETE FROM namespace
WHERE name = $1;
"#,
)
.bind(name)
sqlx::query(r#"UPDATE namespace SET deleted_at=$1 WHERE name = $2;"#)
.bind(flagged_at) // $1
.bind(name) // $2
.execute(self.inner.get_mut())
.await
.context(interface::CouldNotDeleteNamespaceSnafu)

View File

@ -13,7 +13,9 @@ use data_types::{
use datafusion::physical_plan::metrics::Count;
use datafusion_util::MemoryStream;
use iox_catalog::{
interface::{get_schema_by_id, get_table_schema_by_id, Catalog, PartitionRepo},
interface::{
get_schema_by_id, get_table_schema_by_id, Catalog, PartitionRepo, SoftDeletedRows,
},
mem::MemCatalog,
};
use iox_query::{
@ -342,7 +344,11 @@ impl TestNamespace {
/// Get namespace schema for this namespace.
pub async fn schema(&self) -> NamespaceSchema {
let mut repos = self.catalog.catalog.repositories().await;
get_schema_by_id(self.namespace.id, repos.as_mut())
get_schema_by_id(
self.namespace.id,
repos.as_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await
.unwrap()
}

View File

@ -14,7 +14,7 @@ use cache_system::{
resource_consumption::FunctionEstimator,
};
use data_types::{ColumnId, NamespaceId, NamespaceSchema, TableId, TableSchema};
use iox_catalog::interface::{get_schema_by_name, Catalog};
use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows};
use iox_time::TimeProvider;
use schema::Schema;
use std::{
@ -91,7 +91,13 @@ impl NamespaceCache {
let schema = Backoff::new(&backoff_config)
.retry_all_errors("get namespace schema", || async {
let mut repos = catalog.repositories().await;
match get_schema_by_name(&namespace_name, repos.as_mut()).await {
match get_schema_by_name(
&namespace_name,
repos.as_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await
{
Ok(schema) => Ok(Some(schema)),
Err(iox_catalog::interface::Error::NamespaceNotFoundByName {
..

View File

@ -7,7 +7,7 @@ use crate::{
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{Namespace, ShardIndex};
use iox_catalog::interface::Catalog;
use iox_catalog::interface::{Catalog, SoftDeletedRows};
use iox_query::exec::Executor;
use service_common::QueryNamespaceProvider;
use sharder::JumpHash;
@ -189,7 +189,12 @@ impl QuerierDatabase {
let catalog = &self.catalog_cache.catalog();
Backoff::new(&self.backoff_config)
.retry_all_errors("listing namespaces", || async {
catalog.repositories().await.namespaces().list().await
catalog
.repositories()
.await
.namespaces()
.list(SoftDeletedRows::ExcludeDeleted)
.await
})
.await
.expect("retry forever")

View File

@ -3,7 +3,7 @@ use crate::{
cache::namespace::CachedNamespace, create_ingester_connection_for_testing, QuerierCatalogCache,
};
use data_types::{ShardIndex, TableId};
use iox_catalog::interface::get_schema_by_name;
use iox_catalog::interface::{get_schema_by_name, SoftDeletedRows};
use iox_query::exec::ExecutorType;
use iox_tests::TestNamespace;
use sharder::JumpHash;
@ -13,7 +13,11 @@ use tokio::runtime::Handle;
/// Create [`QuerierNamespace`] for testing.
pub async fn querier_namespace(ns: &Arc<TestNamespace>) -> QuerierNamespace {
let mut repos = ns.catalog.catalog.repositories().await;
let schema = get_schema_by_name(&ns.namespace.name, repos.as_mut())
let schema = get_schema_by_name(
&ns.namespace.name,
repos.as_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await
.unwrap();
let cached_ns = Arc::new(CachedNamespace::from(schema));

View File

@ -5,7 +5,7 @@ use crate::{
};
use arrow::record_batch::RecordBatch;
use data_types::{ChunkId, SequenceNumber, ShardIndex};
use iox_catalog::interface::get_schema_by_name;
use iox_catalog::interface::{get_schema_by_name, SoftDeletedRows};
use iox_tests::{TestCatalog, TestPartition, TestShard, TestTable};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::{sort::SortKey, Projection, Schema};
@ -29,7 +29,11 @@ pub async fn querier_table(catalog: &Arc<TestCatalog>, table: &Arc<TestTable>) -
));
let mut repos = catalog.catalog.repositories().await;
let mut catalog_schema = get_schema_by_name(&table.namespace.namespace.name, repos.as_mut())
let mut catalog_schema = get_schema_by_name(
&table.namespace.namespace.name,
repos.as_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await
.unwrap();
let schema = catalog_schema.tables.remove(&table.table.name).unwrap();

View File

@ -3,7 +3,7 @@ use std::{ops::DerefMut, sync::Arc};
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName};
use hashbrown::HashMap;
use iox_catalog::interface::{get_schema_by_name, Catalog};
use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows};
use iox_time::{SystemProvider, TimeProvider};
use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
@ -75,7 +75,11 @@ where
None => {
// Pull the schema from the global catalog or error if it does
// not exist.
let schema = get_schema_by_name(namespace, repos.deref_mut())
let schema = get_schema_by_name(
namespace,
repos.deref_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await
.map_err(|e| {
warn!(

View File

@ -4,7 +4,7 @@ use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName, NamespaceSchema, TableId};
use hashbrown::HashMap;
use iox_catalog::{
interface::{get_schema_by_name, Catalog, Error as CatalogError},
interface::{get_schema_by_name, Catalog, Error as CatalogError, SoftDeletedRows},
validate_or_insert_schema,
};
use metric::U64Counter;
@ -186,7 +186,11 @@ where
None => {
// Pull the schema from the global catalog or error if it does
// not exist.
let schema = get_schema_by_name(namespace, repos.deref_mut())
let schema = get_schema_by_name(
namespace,
repos.deref_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await
.map_err(|e| {
warn!(

View File

@ -5,7 +5,7 @@ use std::{ops::DerefMut, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName};
use iox_catalog::interface::{get_schema_by_name, Catalog};
use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows};
use observability_deps::tracing::*;
use thiserror::Error;
@ -72,7 +72,11 @@ where
// Pull the schema from the global catalog or error if it does
// not exist.
let schema = get_schema_by_name(namespace, repos.deref_mut())
let schema = get_schema_by_name(
namespace,
repos.deref_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await
.map_err(|e| {
warn!(
@ -146,7 +150,7 @@ mod tests {
assert!(
repos
.namespaces()
.get_by_name(ns.as_str())
.get_by_name(ns.as_str(), SoftDeletedRows::ExcludeDeleted)
.await
.expect("lookup should not error")
.is_none(),
@ -185,6 +189,46 @@ mod tests {
assert!(cache.get_schema(&ns).is_some());
}
#[tokio::test]
async fn test_cache_miss_soft_deleted() {
let ns = NamespaceName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
// Create the namespace in the catalog and mark it as deleted
{
let mut repos = catalog.repositories().await;
let topic = repos.topics().create_or_get("bananas").await.unwrap();
let query_pool = repos.query_pools().create_or_get("platanos").await.unwrap();
repos
.namespaces()
.create(&ns, None, topic.id, query_pool.id)
.await
.expect("failed to setup catalog state");
repos
.namespaces()
.soft_delete(&ns)
.await
.expect("failed to setup catalog state");
}
let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache));
let err = resolver
.get_namespace_id(&ns)
.await
.expect_err("lookup should succeed");
assert_matches!(
err,
Error::Lookup(iox_catalog::interface::Error::NamespaceNotFoundByName { .. })
);
// The cache should NOT be populated as a result of the lookup.
assert!(cache.get_schema(&ns).is_none());
}
#[tokio::test]
async fn test_cache_miss_does_not_exist() {
let ns = NamespaceName::try_from("bananas").unwrap();

View File

@ -149,7 +149,7 @@ mod tests {
use assert_matches::assert_matches;
use data_types::{Namespace, NamespaceId, NamespaceSchema};
use iox_catalog::mem::MemCatalog;
use iox_catalog::{interface::SoftDeletedRows, mem::MemCatalog};
use super::*;
use crate::{
@ -206,7 +206,7 @@ mod tests {
assert!(
repos
.namespaces()
.get_by_name(ns.as_str())
.get_by_name(ns.as_str(), SoftDeletedRows::ExcludeDeleted)
.await
.expect("lookup should not error")
.is_none(),
@ -241,7 +241,7 @@ mod tests {
let mut repos = catalog.repositories().await;
let got = repos
.namespaces()
.get_by_name(ns.as_str())
.get_by_name(ns.as_str(), SoftDeletedRows::ExcludeDeleted)
.await
.expect("lookup should not error")
.expect("creation request should be sent to catalog");
@ -289,7 +289,13 @@ mod tests {
// Make double-sure it wasn't created in the catalog
let mut repos = catalog.repositories().await;
assert_matches!(repos.namespaces().get_by_name(ns.as_str()).await, Ok(None));
assert_matches!(
repos
.namespaces()
.get_by_name(ns.as_str(), SoftDeletedRows::ExcludeDeleted)
.await,
Ok(None)
);
}
#[tokio::test]

View File

@ -3,7 +3,10 @@ use std::{collections::BTreeSet, iter, string::String, sync::Arc};
use data_types::{PartitionTemplate, QueryPoolId, TableId, TemplatePart, TopicId};
use hashbrown::HashMap;
use hyper::{Body, Request, Response};
use iox_catalog::{interface::Catalog, mem::MemCatalog};
use iox_catalog::{
interface::{Catalog, SoftDeletedRows},
mem::MemCatalog,
};
use metric::Registry;
use mutable_batch::MutableBatch;
use object_store::memory::InMemory;
@ -185,7 +188,7 @@ impl TestContext {
let mut repos = self.catalog.repositories().await;
let namespace_id = repos
.namespaces()
.get_by_name(namespace)
.get_by_name(namespace, SoftDeletedRows::AllRows)
.await
.expect("query failed")
.expect("namespace does not exist")

View File

@ -5,6 +5,7 @@ use generated_types::influxdata::iox::namespace::v1::{
namespace_service_server::NamespaceService, *,
};
use hyper::StatusCode;
use iox_catalog::interface::SoftDeletedRows;
use iox_time::{SystemProvider, TimeProvider};
use router::{
dml_handlers::{DmlError, RetentionError},
@ -54,7 +55,7 @@ async fn test_namespace_create() {
.repositories()
.await
.namespaces()
.list()
.list(SoftDeletedRows::AllRows)
.await
.expect("failed to query for existing namespaces");
assert!(current.is_empty());
@ -114,13 +115,14 @@ async fn test_namespace_create() {
.repositories()
.await
.namespaces()
.list()
.list(SoftDeletedRows::ExcludeDeleted)
.await
.expect("query failure");
assert_matches!(db_list.as_slice(), [ns] => {
assert_eq!(ns.id.get(), got.id);
assert_eq!(ns.name, got.name);
assert_eq!(ns.retention_period_ns, got.retention_period_ns);
assert!(ns.deleted_at.is_none());
});
}
@ -179,13 +181,14 @@ async fn test_create_namespace_0_retention_period() {
.repositories()
.await
.namespaces()
.list()
.list(SoftDeletedRows::ExcludeDeleted)
.await
.expect("query failure");
assert_matches!(db_list.as_slice(), [ns] => {
assert_eq!(ns.id.get(), got.id);
assert_eq!(ns.name, got.name);
assert_eq!(ns.retention_period_ns, got.retention_period_ns);
assert!(ns.deleted_at.is_none());
});
}
@ -237,7 +240,7 @@ async fn test_create_namespace_negative_retention_period() {
.repositories()
.await
.namespaces()
.list()
.list(SoftDeletedRows::AllRows)
.await
.expect("query failure");
assert!(db_list.is_empty());
@ -321,13 +324,14 @@ async fn test_update_namespace_0_retention_period() {
.repositories()
.await
.namespaces()
.list()
.list(SoftDeletedRows::ExcludeDeleted)
.await
.expect("query failure");
assert_matches!(db_list.as_slice(), [ns] => {
assert_eq!(ns.id.get(), got.id);
assert_eq!(ns.name, got.name);
assert_eq!(ns.retention_period_ns, got.retention_period_ns);
assert!(ns.deleted_at.is_none());
});
}
@ -405,7 +409,7 @@ async fn test_update_namespace_negative_retention_period() {
.repositories()
.await
.namespaces()
.list()
.list(SoftDeletedRows::ExcludeDeleted)
.await
.expect("query failure");
assert_matches!(db_list.as_slice(), [ns] => {

View File

@ -6,6 +6,7 @@ use dml::DmlOperation;
use futures::{stream::FuturesUnordered, StreamExt};
use hashbrown::HashMap;
use hyper::{Body, Request, StatusCode};
use iox_catalog::interface::SoftDeletedRows;
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, DurationHistogram, Metric, U64Counter};
use router::dml_handlers::{DmlError, RetentionError, SchemaError};
@ -44,7 +45,7 @@ async fn test_write_ok() {
.repositories()
.await
.namespaces()
.get_by_name("bananas_test")
.get_by_name("bananas_test", SoftDeletedRows::ExcludeDeleted)
.await
.expect("query should succeed")
.expect("namespace not found");

View File

@ -15,7 +15,7 @@
use data_types::{PartitionId, TableId};
use generated_types::influxdata::iox::catalog::v1::*;
use iox_catalog::interface::Catalog;
use iox_catalog::interface::{Catalog, SoftDeletedRows};
use observability_deps::tracing::*;
use std::sync::Arc;
use tonic::{Request, Response, Status};
@ -90,7 +90,7 @@ impl catalog_service_server::CatalogService for CatalogService {
let namespace = repos
.namespaces()
.get_by_name(&req.namespace_name)
.get_by_name(&req.namespace_name, SoftDeletedRows::ExcludeDeleted)
.await
.map_err(|e| Status::unknown(e.to_string()))?
.ok_or_else(|| {
@ -136,7 +136,7 @@ impl catalog_service_server::CatalogService for CatalogService {
let namespace = repos
.namespaces()
.get_by_name(&req.namespace_name)
.get_by_name(&req.namespace_name, SoftDeletedRows::ExcludeDeleted)
.await
.map_err(|e| Status::unknown(e.to_string()))?
.ok_or_else(|| {

View File

@ -4,7 +4,7 @@ use std::sync::Arc;
use data_types::{Namespace as CatalogNamespace, QueryPoolId, TopicId};
use generated_types::influxdata::iox::namespace::v1::*;
use iox_catalog::interface::Catalog;
use iox_catalog::interface::{Catalog, SoftDeletedRows};
use observability_deps::tracing::{debug, info, warn};
use tonic::{Request, Response, Status};
@ -39,7 +39,11 @@ impl namespace_service_server::NamespaceService for NamespaceService {
) -> Result<Response<GetNamespacesResponse>, Status> {
let mut repos = self.catalog.repositories().await;
let namespaces = repos.namespaces().list().await.map_err(|e| {
let namespaces = repos
.namespaces()
.list(SoftDeletedRows::ExcludeDeleted)
.await
.map_err(|e| {
warn!(error=%e, "failed to retrieve namespaces from catalog");
Status::not_found(e.to_string())
})?;

View File

@ -3,7 +3,7 @@
use std::{ops::DerefMut, sync::Arc};
use generated_types::influxdata::iox::schema::v1::*;
use iox_catalog::interface::{get_schema_by_name, Catalog};
use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows};
use observability_deps::tracing::warn;
use tonic::{Request, Response, Status};
@ -29,7 +29,11 @@ impl schema_service_server::SchemaService for SchemaService {
let mut repos = self.catalog.repositories().await;
let req = request.into_inner();
let schema = get_schema_by_name(&req.namespace, repos.deref_mut())
let schema = get_schema_by_name(
&req.namespace,
repos.deref_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await
.map_err(|e| {
warn!(error=%e, %req.namespace, "failed to retrieve namespace schema");