refactor: move deleter module into its own crate

feat/implement-gen1-file-retention-period
wayne warren 2025-06-06 19:43:52 -06:00
parent 89fdb6010d
commit 66c81eb00a
16 changed files with 73 additions and 18 deletions

22
Cargo.lock generated
View File

@ -2779,6 +2779,7 @@ dependencies = [
"influxdb3_catalog", "influxdb3_catalog",
"influxdb3_clap_blocks", "influxdb3_clap_blocks",
"influxdb3_client", "influxdb3_client",
"influxdb3_deleter",
"influxdb3_process", "influxdb3_process",
"influxdb3_processing_engine", "influxdb3_processing_engine",
"influxdb3_server", "influxdb3_server",
@ -2985,6 +2986,24 @@ dependencies = [
"url", "url",
] ]
[[package]]
name = "influxdb3_deleter"
version = "3.2.0-nightly"
dependencies = [
"async-trait",
"futures-util",
"influxdb3_catalog",
"influxdb3_id",
"influxdb3_shutdown",
"iox_time",
"object_store",
"observability_deps",
"parking_lot",
"test-log",
"test_helpers",
"tokio",
]
[[package]] [[package]]
name = "influxdb3_id" name = "influxdb3_id"
version = "3.2.0-nightly" version = "3.2.0-nightly"
@ -3067,6 +3086,7 @@ dependencies = [
"influxdb3_cache", "influxdb3_cache",
"influxdb3_catalog", "influxdb3_catalog",
"influxdb3_client", "influxdb3_client",
"influxdb3_deleter",
"influxdb3_internal_api", "influxdb3_internal_api",
"influxdb3_py_api", "influxdb3_py_api",
"influxdb3_shutdown", "influxdb3_shutdown",
@ -3152,6 +3172,7 @@ dependencies = [
"influxdb3_cache", "influxdb3_cache",
"influxdb3_catalog", "influxdb3_catalog",
"influxdb3_client", "influxdb3_client",
"influxdb3_deleter",
"influxdb3_id", "influxdb3_id",
"influxdb3_internal_api", "influxdb3_internal_api",
"influxdb3_process", "influxdb3_process",
@ -3354,6 +3375,7 @@ dependencies = [
"influxdb-line-protocol", "influxdb-line-protocol",
"influxdb3_cache", "influxdb3_cache",
"influxdb3_catalog", "influxdb3_catalog",
"influxdb3_deleter",
"influxdb3_id", "influxdb3_id",
"influxdb3_internal_api", "influxdb3_internal_api",
"influxdb3_py_api", "influxdb3_py_api",

View File

@ -6,7 +6,7 @@ members = [
"influxdb3_cache", "influxdb3_cache",
"influxdb3_catalog", "influxdb3_catalog",
"influxdb3_clap_blocks", "influxdb3_clap_blocks",
"influxdb3_client", "influxdb3_client", "influxdb3_deleter",
"influxdb3_id", "influxdb3_id",
"influxdb3_internal_api", "influxdb3_internal_api",
"influxdb3_load_generator", "influxdb3_load_generator",

View File

@ -32,6 +32,7 @@ influxdb3_cache = { path = "../influxdb3_cache" }
influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_client = { path = "../influxdb3_client" } influxdb3_client = { path = "../influxdb3_client" }
influxdb3_clap_blocks = { path = "../influxdb3_clap_blocks" } influxdb3_clap_blocks = { path = "../influxdb3_clap_blocks" }
influxdb3_deleter = { path = "../influxdb3_deleter" }
influxdb3_process = { path = "../influxdb3_process", default-features = false } influxdb3_process = { path = "../influxdb3_process", default-features = false }
influxdb3_processing_engine = {path = "../influxdb3_processing_engine"} influxdb3_processing_engine = {path = "../influxdb3_processing_engine"}
influxdb3_server = { path = "../influxdb3_server" } influxdb3_server = { path = "../influxdb3_server" }

View File

@ -17,6 +17,7 @@ use influxdb3_clap_blocks::{
socket_addr::SocketAddr, socket_addr::SocketAddr,
tokio::TokioDatafusionConfig, tokio::TokioDatafusionConfig,
}; };
use influxdb3_deleter::{DeleteManager, DeleteManagerArgs};
use influxdb3_process::{ use influxdb3_process::{
INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_START_TIME, PROCESS_UUID_STR, INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_START_TIME, PROCESS_UUID_STR,
}; };
@ -41,7 +42,6 @@ use influxdb3_telemetry::{
use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{ use influxdb3_write::{
WriteBuffer, WriteBuffer,
deleter::{DeleteManager, DeleteManagerArgs},
persister::Persister, persister::Persister,
write_buffer::{ write_buffer::{
WriteBufferImpl, WriteBufferImplArgs, check_mem_and_force_snapshot_loop, WriteBufferImpl, WriteBufferImplArgs, check_mem_and_force_snapshot_loop,

View File

@ -0,0 +1,25 @@
[package]
name = "influxdb3_deleter"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
async-trait.workspace = true
futures-util.workspace = true
influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_id = { path = "../influxdb3_id" }
influxdb3_shutdown = { path = "../influxdb3_shutdown" }
iox_time.workspace = true
object_store.workspace = true
observability_deps.workspace = true
parking_lot.workspace = true
tokio.workspace = true
[lints]
workspace = true
[dev-dependencies]
test-log.workspace = true
test_helpers.workspace = true

View File

@ -1,15 +1,18 @@
use crate::async_collections; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use iox_time::{Time, TimeProvider};
use object_store::ObjectStore;
use observability_deps::tracing::info;
use influxdb3_catalog::catalog::Catalog; use influxdb3_catalog::catalog::Catalog;
use influxdb3_catalog::channel::CatalogUpdateReceiver; use influxdb3_catalog::channel::CatalogUpdateReceiver;
use influxdb3_catalog::log::{CatalogBatch, DatabaseCatalogOp, SoftDeleteTableLog}; use influxdb3_catalog::log::{CatalogBatch, DatabaseCatalogOp, SoftDeleteTableLog};
use influxdb3_catalog::resource::CatalogResource; use influxdb3_catalog::resource::CatalogResource;
use influxdb3_id::{DbId, ParquetFileId, TableId}; use influxdb3_id::{DbId, ParquetFileId, TableId};
use influxdb3_shutdown::ShutdownToken; use influxdb3_shutdown::ShutdownToken;
use iox_time::{Time, TimeProvider};
use object_store::ObjectStore; mod async_collections;
use observability_deps::tracing::info;
use std::sync::Arc;
/// Trait for deleting database objects from object storage. /// Trait for deleting database objects from object storage.
#[async_trait] #[async_trait]
@ -224,10 +227,10 @@ impl ObjectDeleter for ObjectStoreDeleter {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::async_collections; use super::*;
use crate::deleter::DeleteManager;
use crate::async_collections;
use super::{DeleteManagerArgs, DeleteTask, ObjectDeleter};
use influxdb3_catalog::catalog::{Catalog, HardDeletionTime}; use influxdb3_catalog::catalog::{Catalog, HardDeletionTime};
use influxdb3_catalog::log::FieldDataType; use influxdb3_catalog::log::FieldDataType;
use influxdb3_catalog::resource::CatalogResource; use influxdb3_catalog::resource::CatalogResource;

View File

@ -20,6 +20,7 @@ iox_http_util.workspace = true
iox_time.workspace = true iox_time.workspace = true
influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_client = { path = "../influxdb3_client" } influxdb3_client = { path = "../influxdb3_client" }
influxdb3_deleter = { path = "../influxdb3_deleter" }
influxdb3_internal_api = { path = "../influxdb3_internal_api" } influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_py_api = { path = "../influxdb3_py_api" } influxdb3_py_api = { path = "../influxdb3_py_api" }
influxdb3_types = { path = "../influxdb3_types" } influxdb3_types = { path = "../influxdb3_types" }

View File

@ -753,11 +753,11 @@ mod tests {
use influxdb3_catalog::CatalogError; use influxdb3_catalog::CatalogError;
use influxdb3_catalog::catalog::Catalog; use influxdb3_catalog::catalog::Catalog;
use influxdb3_catalog::log::{TriggerSettings, TriggerSpecificationDefinition}; use influxdb3_catalog::log::{TriggerSettings, TriggerSpecificationDefinition};
use influxdb3_deleter::{DeleteManager, DeleteManagerArgs};
use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor; use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor;
use influxdb3_shutdown::ShutdownManager; use influxdb3_shutdown::ShutdownManager;
use influxdb3_sys_events::SysEventStore; use influxdb3_sys_events::SysEventStore;
use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::deleter::{DeleteManager, DeleteManagerArgs};
use influxdb3_write::persister::Persister; use influxdb3_write::persister::Persister;
use influxdb3_write::write_buffer::{WriteBufferImpl, WriteBufferImplArgs}; use influxdb3_write::write_buffer::{WriteBufferImpl, WriteBufferImplArgs};
use influxdb3_write::{Precision, WriteBuffer}; use influxdb3_write::{Precision, WriteBuffer};

View File

@ -38,6 +38,7 @@ influxdb3_authz = { path = "../influxdb3_authz" }
influxdb3_cache = { path = "../influxdb3_cache" } influxdb3_cache = { path = "../influxdb3_cache" }
influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_client = { path = "../influxdb3_client" } influxdb3_client = { path = "../influxdb3_client" }
influxdb3_deleter = { path = "../influxdb3_deleter" }
influxdb3_id = { path = "../influxdb3_id" } influxdb3_id = { path = "../influxdb3_id" }
influxdb3_internal_api = { path = "../influxdb3_internal_api" } influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_process = { path = "../influxdb3_process", default-features = false } influxdb3_process = { path = "../influxdb3_process", default-features = false }

View File

@ -292,6 +292,7 @@ mod tests {
use influxdb3_cache::last_cache::LastCacheProvider; use influxdb3_cache::last_cache::LastCacheProvider;
use influxdb3_cache::parquet_cache::test_cached_obj_store_and_oracle; use influxdb3_cache::parquet_cache::test_cached_obj_store_and_oracle;
use influxdb3_catalog::catalog::Catalog; use influxdb3_catalog::catalog::Catalog;
use influxdb3_deleter::{DeleteManager, DeleteManagerArgs};
use influxdb3_processing_engine::ProcessingEngineManagerImpl; use influxdb3_processing_engine::ProcessingEngineManagerImpl;
use influxdb3_processing_engine::environment::DisabledManager; use influxdb3_processing_engine::environment::DisabledManager;
use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager; use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager;
@ -299,7 +300,6 @@ mod tests {
use influxdb3_sys_events::SysEventStore; use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore; use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::WalConfig; use influxdb3_wal::WalConfig;
use influxdb3_write::deleter::{DeleteManager, DeleteManagerArgs};
use influxdb3_write::persister::Persister; use influxdb3_write::persister::Persister;
use influxdb3_write::write_buffer::persisted_files::PersistedFiles; use influxdb3_write::write_buffer::persisted_files::PersistedFiles;
use influxdb3_write::{Bufferer, WriteBuffer}; use influxdb3_write::{Bufferer, WriteBuffer};

View File

@ -785,13 +785,16 @@ mod tests {
parquet_cache::test_cached_obj_store_and_oracle, parquet_cache::test_cached_obj_store_and_oracle,
}; };
use influxdb3_catalog::catalog::Catalog; use influxdb3_catalog::catalog::Catalog;
use influxdb3_deleter::{DeleteManager, DeleteManagerArgs};
use influxdb3_internal_api::query_executor::QueryExecutor; use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_shutdown::ShutdownManager; use influxdb3_shutdown::ShutdownManager;
use influxdb3_sys_events::SysEventStore; use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore; use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{ use influxdb3_write::{
deleter::{DeleteManager, DeleteManagerArgs}, persister::Persister, write_buffer::{persisted_files::PersistedFiles, WriteBufferImpl, WriteBufferImplArgs}, Bufferer, WriteBuffer Bufferer, WriteBuffer,
persister::Persister,
write_buffer::{WriteBufferImpl, WriteBufferImplArgs, persisted_files::PersistedFiles},
}; };
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig, PerQueryMemoryPoolConfig}; use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig, PerQueryMemoryPoolConfig};
use iox_time::{MockProvider, Time}; use iox_time::{MockProvider, Time};

View File

@ -6,6 +6,7 @@ use datafusion::assert_batches_sorted_eq;
use futures::TryStreamExt; use futures::TryStreamExt;
use influxdb3_cache::{distinct_cache::DistinctCacheProvider, last_cache::LastCacheProvider}; use influxdb3_cache::{distinct_cache::DistinctCacheProvider, last_cache::LastCacheProvider};
use influxdb3_catalog::catalog::Catalog; use influxdb3_catalog::catalog::Catalog;
use influxdb3_deleter::{DeleteManager, DeleteManagerArgs};
use influxdb3_internal_api::query_executor::QueryExecutor; use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_server::query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl}; use influxdb3_server::query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl};
use influxdb3_shutdown::ShutdownManager; use influxdb3_shutdown::ShutdownManager;
@ -14,7 +15,6 @@ use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{ use influxdb3_write::{
Precision, WriteBuffer, Precision, WriteBuffer,
deleter::{DeleteManager, DeleteManagerArgs},
persister::Persister, persister::Persister,
write_buffer::{WriteBufferImpl, WriteBufferImplArgs}, write_buffer::{WriteBufferImpl, WriteBufferImplArgs},
}; };

View File

@ -26,6 +26,7 @@ trace.workspace = true
# Local deps # Local deps
influxdb3_cache = { path = "../influxdb3_cache" } influxdb3_cache = { path = "../influxdb3_cache" }
influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_deleter = { path = "../influxdb3_deleter" }
influxdb3_id = { path = "../influxdb3_id" } influxdb3_id = { path = "../influxdb3_id" }
influxdb3_internal_api = { path = "../influxdb3_internal_api" } influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_py_api = {path = "../influxdb3_py_api"} influxdb3_py_api = {path = "../influxdb3_py_api"}

View File

@ -4,9 +4,7 @@
//! data into parquet files that are persisted to object storage. A snapshot file is written that contains the //! data into parquet files that are persisted to object storage. A snapshot file is written that contains the
//! metadata of the parquet files that were written in that snapshot. //! metadata of the parquet files that were written in that snapshot.
pub(crate) mod async_collections;
pub mod chunk; pub mod chunk;
pub mod deleter;
pub mod paths; pub mod paths;
pub mod persister; pub mod persister;
pub mod write_buffer; pub mod write_buffer;

View File

@ -14,7 +14,6 @@ use crate::{
LastCacheManager, ParquetFile, PersistedSnapshot, PersistedSnapshotVersion, Precision, LastCacheManager, ParquetFile, PersistedSnapshot, PersistedSnapshotVersion, Precision,
WriteBuffer, WriteLineError, WriteBuffer, WriteLineError,
chunk::ParquetChunk, chunk::ParquetChunk,
deleter::DeleteTaskQueuer,
persister::{Persister, PersisterError}, persister::{Persister, PersisterError},
write_buffer::{ write_buffer::{
persisted_files::PersistedFiles, queryable_buffer::QueryableBuffer, persisted_files::PersistedFiles, queryable_buffer::QueryableBuffer,
@ -41,6 +40,7 @@ use influxdb3_catalog::{
CatalogError, CatalogError,
catalog::{Catalog, DatabaseSchema, Prompt, TableDefinition}, catalog::{Catalog, DatabaseSchema, Prompt, TableDefinition},
}; };
use influxdb3_deleter::DeleteTaskQueuer;
use influxdb3_id::{DbId, TableId}; use influxdb3_id::{DbId, TableId};
use influxdb3_wal::{ use influxdb3_wal::{
Wal, WalConfig, WalFileNotifier, WalOp, Wal, WalConfig, WalFileNotifier, WalOp,
@ -668,7 +668,6 @@ mod tests {
use super::*; use super::*;
use crate::PersistedSnapshot; use crate::PersistedSnapshot;
use crate::deleter::{DeleteManager, DeleteManagerArgs};
use crate::paths::SnapshotInfoFilePath; use crate::paths::SnapshotInfoFilePath;
use crate::persister::Persister; use crate::persister::Persister;
use crate::test_helpers::WriteBufferTester; use crate::test_helpers::WriteBufferTester;
@ -683,6 +682,7 @@ mod tests {
use influxdb3_cache::parquet_cache::test_cached_obj_store_and_oracle; use influxdb3_cache::parquet_cache::test_cached_obj_store_and_oracle;
use influxdb3_catalog::catalog::{CatalogSequenceNumber, HardDeletionTime}; use influxdb3_catalog::catalog::{CatalogSequenceNumber, HardDeletionTime};
use influxdb3_catalog::log::FieldDataType; use influxdb3_catalog::log::FieldDataType;
use influxdb3_deleter::{DeleteManager, DeleteManagerArgs};
use influxdb3_id::{ColumnId, DbId, ParquetFileId}; use influxdb3_id::{ColumnId, DbId, ParquetFileId};
use influxdb3_shutdown::ShutdownManager; use influxdb3_shutdown::ShutdownManager;
use influxdb3_test_helpers::object_store::RequestCountedObjectStore; use influxdb3_test_helpers::object_store::RequestCountedObjectStore;