From 66c81eb00aa4f857b954036e6f01d154601b4c16 Mon Sep 17 00:00:00 2001 From: wayne warren Date: Fri, 6 Jun 2025 19:43:52 -0600 Subject: [PATCH] refactor: move deleter module into its own crate --- Cargo.lock | 22 ++++++++++++++++ Cargo.toml | 2 +- influxdb3/Cargo.toml | 1 + influxdb3/src/commands/serve.rs | 2 +- influxdb3_deleter/Cargo.toml | 25 +++++++++++++++++++ .../src/async_collections.rs | 0 .../src/lib.rs | 19 ++++++++------ influxdb3_processing_engine/Cargo.toml | 1 + influxdb3_processing_engine/src/lib.rs | 2 +- influxdb3_server/Cargo.toml | 1 + influxdb3_server/src/lib.rs | 2 +- influxdb3_server/src/query_executor/mod.rs | 5 +++- influxdb3_server/tests/lib.rs | 2 +- influxdb3_write/Cargo.toml | 1 + influxdb3_write/src/lib.rs | 2 -- influxdb3_write/src/write_buffer/mod.rs | 4 +-- 16 files changed, 73 insertions(+), 18 deletions(-) create mode 100644 influxdb3_deleter/Cargo.toml rename {influxdb3_write => influxdb3_deleter}/src/async_collections.rs (100%) rename influxdb3_write/src/deleter.rs => influxdb3_deleter/src/lib.rs (99%) diff --git a/Cargo.lock b/Cargo.lock index f4760966cb..7f274031cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2779,6 +2779,7 @@ dependencies = [ "influxdb3_catalog", "influxdb3_clap_blocks", "influxdb3_client", + "influxdb3_deleter", "influxdb3_process", "influxdb3_processing_engine", "influxdb3_server", @@ -2985,6 +2986,24 @@ dependencies = [ "url", ] +[[package]] +name = "influxdb3_deleter" +version = "3.2.0-nightly" +dependencies = [ + "async-trait", + "futures-util", + "influxdb3_catalog", + "influxdb3_id", + "influxdb3_shutdown", + "iox_time", + "object_store", + "observability_deps", + "parking_lot", + "test-log", + "test_helpers", + "tokio", +] + [[package]] name = "influxdb3_id" version = "3.2.0-nightly" @@ -3067,6 +3086,7 @@ dependencies = [ "influxdb3_cache", "influxdb3_catalog", "influxdb3_client", + "influxdb3_deleter", "influxdb3_internal_api", "influxdb3_py_api", "influxdb3_shutdown", @@ -3152,6 +3172,7 @@ dependencies = [ "influxdb3_cache", "influxdb3_catalog", "influxdb3_client", + "influxdb3_deleter", "influxdb3_id", "influxdb3_internal_api", "influxdb3_process", @@ -3354,6 +3375,7 @@ dependencies = [ "influxdb-line-protocol", "influxdb3_cache", "influxdb3_catalog", + "influxdb3_deleter", "influxdb3_id", "influxdb3_internal_api", "influxdb3_py_api", diff --git a/Cargo.toml b/Cargo.toml index 47f009e8b3..5631ffec8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ members = [ "influxdb3_cache", "influxdb3_catalog", "influxdb3_clap_blocks", - "influxdb3_client", + "influxdb3_client", "influxdb3_deleter", "influxdb3_id", "influxdb3_internal_api", "influxdb3_load_generator", diff --git a/influxdb3/Cargo.toml b/influxdb3/Cargo.toml index a28c4eec61..561f56b558 100644 --- a/influxdb3/Cargo.toml +++ b/influxdb3/Cargo.toml @@ -32,6 +32,7 @@ influxdb3_cache = { path = "../influxdb3_cache" } influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_client = { path = "../influxdb3_client" } influxdb3_clap_blocks = { path = "../influxdb3_clap_blocks" } +influxdb3_deleter = { path = "../influxdb3_deleter" } influxdb3_process = { path = "../influxdb3_process", default-features = false } influxdb3_processing_engine = {path = "../influxdb3_processing_engine"} influxdb3_server = { path = "../influxdb3_server" } diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 7d5c358bc5..ccd2f38567 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -17,6 +17,7 @@ use influxdb3_clap_blocks::{ socket_addr::SocketAddr, tokio::TokioDatafusionConfig, }; +use influxdb3_deleter::{DeleteManager, DeleteManagerArgs}; use influxdb3_process::{ INFLUXDB3_GIT_HASH, INFLUXDB3_VERSION, PROCESS_START_TIME, PROCESS_UUID_STR, }; @@ -41,7 +42,6 @@ use influxdb3_telemetry::{ use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_write::{ WriteBuffer, - deleter::{DeleteManager, DeleteManagerArgs}, persister::Persister, write_buffer::{ WriteBufferImpl, WriteBufferImplArgs, check_mem_and_force_snapshot_loop, diff --git a/influxdb3_deleter/Cargo.toml b/influxdb3_deleter/Cargo.toml new file mode 100644 index 0000000000..8315efb61b --- /dev/null +++ b/influxdb3_deleter/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "influxdb3_deleter" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +async-trait.workspace = true +futures-util.workspace = true +influxdb3_catalog = { path = "../influxdb3_catalog" } +influxdb3_id = { path = "../influxdb3_id" } +influxdb3_shutdown = { path = "../influxdb3_shutdown" } +iox_time.workspace = true +object_store.workspace = true +observability_deps.workspace = true +parking_lot.workspace = true +tokio.workspace = true + +[lints] +workspace = true + +[dev-dependencies] +test-log.workspace = true +test_helpers.workspace = true diff --git a/influxdb3_write/src/async_collections.rs b/influxdb3_deleter/src/async_collections.rs similarity index 100% rename from influxdb3_write/src/async_collections.rs rename to influxdb3_deleter/src/async_collections.rs diff --git a/influxdb3_write/src/deleter.rs b/influxdb3_deleter/src/lib.rs similarity index 99% rename from influxdb3_write/src/deleter.rs rename to influxdb3_deleter/src/lib.rs index 050bb6d70e..299616664b 100644 --- a/influxdb3_write/src/deleter.rs +++ b/influxdb3_deleter/src/lib.rs @@ -1,15 +1,18 @@ -use crate::async_collections; +use std::sync::Arc; + use async_trait::async_trait; +use iox_time::{Time, TimeProvider}; +use object_store::ObjectStore; +use observability_deps::tracing::info; + use influxdb3_catalog::catalog::Catalog; use influxdb3_catalog::channel::CatalogUpdateReceiver; use influxdb3_catalog::log::{CatalogBatch, DatabaseCatalogOp, SoftDeleteTableLog}; use influxdb3_catalog::resource::CatalogResource; use influxdb3_id::{DbId, ParquetFileId, TableId}; use influxdb3_shutdown::ShutdownToken; -use iox_time::{Time, TimeProvider}; -use object_store::ObjectStore; -use observability_deps::tracing::info; -use std::sync::Arc; + +mod async_collections; /// Trait for deleting database objects from object storage. #[async_trait] @@ -224,10 +227,10 @@ impl ObjectDeleter for ObjectStoreDeleter { #[cfg(test)] mod tests { - use crate::async_collections; - use crate::deleter::DeleteManager; + use super::*; + + use crate::async_collections; - use super::{DeleteManagerArgs, DeleteTask, ObjectDeleter}; use influxdb3_catalog::catalog::{Catalog, HardDeletionTime}; use influxdb3_catalog::log::FieldDataType; use influxdb3_catalog::resource::CatalogResource; diff --git a/influxdb3_processing_engine/Cargo.toml b/influxdb3_processing_engine/Cargo.toml index a3ea154505..53d86654c5 100644 --- a/influxdb3_processing_engine/Cargo.toml +++ b/influxdb3_processing_engine/Cargo.toml @@ -20,6 +20,7 @@ iox_http_util.workspace = true iox_time.workspace = true influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_client = { path = "../influxdb3_client" } +influxdb3_deleter = { path = "../influxdb3_deleter" } influxdb3_internal_api = { path = "../influxdb3_internal_api" } influxdb3_py_api = { path = "../influxdb3_py_api" } influxdb3_types = { path = "../influxdb3_types" } diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index ed891de62a..960346e81b 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -753,11 +753,11 @@ mod tests { use influxdb3_catalog::CatalogError; use influxdb3_catalog::catalog::Catalog; use influxdb3_catalog::log::{TriggerSettings, TriggerSpecificationDefinition}; + use influxdb3_deleter::{DeleteManager, DeleteManagerArgs}; use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor; use influxdb3_shutdown::ShutdownManager; use influxdb3_sys_events::SysEventStore; use influxdb3_wal::{Gen1Duration, WalConfig}; - use influxdb3_write::deleter::{DeleteManager, DeleteManagerArgs}; use influxdb3_write::persister::Persister; use influxdb3_write::write_buffer::{WriteBufferImpl, WriteBufferImplArgs}; use influxdb3_write::{Precision, WriteBuffer}; diff --git a/influxdb3_server/Cargo.toml b/influxdb3_server/Cargo.toml index 28896cb266..c884c1d0a7 100644 --- a/influxdb3_server/Cargo.toml +++ b/influxdb3_server/Cargo.toml @@ -38,6 +38,7 @@ influxdb3_authz = { path = "../influxdb3_authz" } influxdb3_cache = { path = "../influxdb3_cache" } influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_client = { path = "../influxdb3_client" } +influxdb3_deleter = { path = "../influxdb3_deleter" } influxdb3_id = { path = "../influxdb3_id" } influxdb3_internal_api = { path = "../influxdb3_internal_api" } influxdb3_process = { path = "../influxdb3_process", default-features = false } diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 3d25998df1..64ce9fd710 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -292,6 +292,7 @@ mod tests { use influxdb3_cache::last_cache::LastCacheProvider; use influxdb3_cache::parquet_cache::test_cached_obj_store_and_oracle; use influxdb3_catalog::catalog::Catalog; + use influxdb3_deleter::{DeleteManager, DeleteManagerArgs}; use influxdb3_processing_engine::ProcessingEngineManagerImpl; use influxdb3_processing_engine::environment::DisabledManager; use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager; @@ -299,7 +300,6 @@ mod tests { use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::WalConfig; - use influxdb3_write::deleter::{DeleteManager, DeleteManagerArgs}; use influxdb3_write::persister::Persister; use influxdb3_write::write_buffer::persisted_files::PersistedFiles; use influxdb3_write::{Bufferer, WriteBuffer}; diff --git a/influxdb3_server/src/query_executor/mod.rs b/influxdb3_server/src/query_executor/mod.rs index 1a99ef9a58..d1528a1508 100644 --- a/influxdb3_server/src/query_executor/mod.rs +++ b/influxdb3_server/src/query_executor/mod.rs @@ -785,13 +785,16 @@ mod tests { parquet_cache::test_cached_obj_store_and_oracle, }; use influxdb3_catalog::catalog::Catalog; + use influxdb3_deleter::{DeleteManager, DeleteManagerArgs}; use influxdb3_internal_api::query_executor::QueryExecutor; use influxdb3_shutdown::ShutdownManager; use influxdb3_sys_events::SysEventStore; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_write::{ - deleter::{DeleteManager, DeleteManagerArgs}, persister::Persister, write_buffer::{persisted_files::PersistedFiles, WriteBufferImpl, WriteBufferImplArgs}, Bufferer, WriteBuffer + Bufferer, WriteBuffer, + persister::Persister, + write_buffer::{WriteBufferImpl, WriteBufferImplArgs, persisted_files::PersistedFiles}, }; use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig, PerQueryMemoryPoolConfig}; use iox_time::{MockProvider, Time}; diff --git a/influxdb3_server/tests/lib.rs b/influxdb3_server/tests/lib.rs index a99740b135..98776fd176 100644 --- a/influxdb3_server/tests/lib.rs +++ b/influxdb3_server/tests/lib.rs @@ -6,6 +6,7 @@ use datafusion::assert_batches_sorted_eq; use futures::TryStreamExt; use influxdb3_cache::{distinct_cache::DistinctCacheProvider, last_cache::LastCacheProvider}; use influxdb3_catalog::catalog::Catalog; +use influxdb3_deleter::{DeleteManager, DeleteManagerArgs}; use influxdb3_internal_api::query_executor::QueryExecutor; use influxdb3_server::query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl}; use influxdb3_shutdown::ShutdownManager; @@ -14,7 +15,6 @@ use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_write::{ Precision, WriteBuffer, - deleter::{DeleteManager, DeleteManagerArgs}, persister::Persister, write_buffer::{WriteBufferImpl, WriteBufferImplArgs}, }; diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index 4b1c974d73..2e91071c7e 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -26,6 +26,7 @@ trace.workspace = true # Local deps influxdb3_cache = { path = "../influxdb3_cache" } influxdb3_catalog = { path = "../influxdb3_catalog" } +influxdb3_deleter = { path = "../influxdb3_deleter" } influxdb3_id = { path = "../influxdb3_id" } influxdb3_internal_api = { path = "../influxdb3_internal_api" } influxdb3_py_api = {path = "../influxdb3_py_api"} diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index d4a4774264..88a14d4945 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -4,9 +4,7 @@ //! data into parquet files that are persisted to object storage. A snapshot file is written that contains the //! metadata of the parquet files that were written in that snapshot. -pub(crate) mod async_collections; pub mod chunk; -pub mod deleter; pub mod paths; pub mod persister; pub mod write_buffer; diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 534d66f099..593a960533 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -14,7 +14,6 @@ use crate::{ LastCacheManager, ParquetFile, PersistedSnapshot, PersistedSnapshotVersion, Precision, WriteBuffer, WriteLineError, chunk::ParquetChunk, - deleter::DeleteTaskQueuer, persister::{Persister, PersisterError}, write_buffer::{ persisted_files::PersistedFiles, queryable_buffer::QueryableBuffer, @@ -41,6 +40,7 @@ use influxdb3_catalog::{ CatalogError, catalog::{Catalog, DatabaseSchema, Prompt, TableDefinition}, }; +use influxdb3_deleter::DeleteTaskQueuer; use influxdb3_id::{DbId, TableId}; use influxdb3_wal::{ Wal, WalConfig, WalFileNotifier, WalOp, @@ -668,7 +668,6 @@ mod tests { use super::*; use crate::PersistedSnapshot; - use crate::deleter::{DeleteManager, DeleteManagerArgs}; use crate::paths::SnapshotInfoFilePath; use crate::persister::Persister; use crate::test_helpers::WriteBufferTester; @@ -683,6 +682,7 @@ mod tests { use influxdb3_cache::parquet_cache::test_cached_obj_store_and_oracle; use influxdb3_catalog::catalog::{CatalogSequenceNumber, HardDeletionTime}; use influxdb3_catalog::log::FieldDataType; + use influxdb3_deleter::{DeleteManager, DeleteManagerArgs}; use influxdb3_id::{ColumnId, DbId, ParquetFileId}; use influxdb3_shutdown::ShutdownManager; use influxdb3_test_helpers::object_store::RequestCountedObjectStore;