diff --git a/Cargo.lock b/Cargo.lock index aa6d281291..5e9f594d64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1753,6 +1753,7 @@ dependencies = [ "parking_lot", "parquet", "parquet_file", + "parse_duration", "pprof", "predicates", "prettytable-rs", diff --git a/Cargo.toml b/Cargo.toml index 31827e74f2..314a0bf676 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,6 +131,7 @@ once_cell = { version = "1.4.0", features = ["parking_lot"] } parking_lot = "0.11.2" itertools = "0.10.1" parquet = "5.3" +parse_duration = "2.1.1" # used by arrow/datafusion anyway prettytable-rs = "0.8" pprof = { version = "^0.5", default-features = false, features = ["flamegraph", "protobuf"], optional = true } diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index d78cb27de2..54ed695f32 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -183,6 +183,7 @@ impl Partitioner for DatabaseRules { pub const DEFAULT_WORKER_BACKOFF_MILLIS: u64 = 1_000; pub const DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT: u64 = 100; +pub const DEFAULT_CATALOG_TRANSACTION_PRUNE_AGE: Duration = Duration::from_secs(24 * 60 * 60); pub const DEFAULT_MUB_ROW_THRESHOLD: usize = 100_000; pub const DEFAULT_PERSIST_ROW_THRESHOLD: usize = 1_000_000; pub const DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS: u32 = 30 * 60; @@ -215,6 +216,11 @@ pub struct LifecycleRules { /// After how many transactions should IOx write a new checkpoint? pub catalog_transactions_until_checkpoint: NonZeroU64, + /// Prune catalog transactions older than the given age. + /// + /// Keeping old transaction can be useful for debugging. + pub catalog_transaction_prune_age: Duration, + /// Once a partition hasn't received a write for this period of time, /// it will be compacted and, if set, persisted. Writers will generally /// have this amount of time to send late arriving writes or this could @@ -301,6 +307,7 @@ impl Default for LifecycleRules { DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT, ) .unwrap(), + catalog_transaction_prune_age: DEFAULT_CATALOG_TRANSACTION_PRUNE_AGE, late_arrive_window_seconds: NonZeroU32::new(DEFAULT_LATE_ARRIVE_WINDOW_SECONDS) .unwrap(), persist_row_threshold: NonZeroUsize::new(DEFAULT_PERSIST_ROW_THRESHOLD).unwrap(), diff --git a/generated_types/protos/influxdata/iox/management/v1/database_rules.proto b/generated_types/protos/influxdata/iox/management/v1/database_rules.proto index d96f745a4e..5200effe62 100644 --- a/generated_types/protos/influxdata/iox/management/v1/database_rules.proto +++ b/generated_types/protos/influxdata/iox/management/v1/database_rules.proto @@ -62,6 +62,13 @@ message LifecycleRules { // If 0 / absent, this default to 100. uint64 catalog_transactions_until_checkpoint = 11; + // Prune catalog transactions older than the given age. + // + // Keeping old transaction can be useful for debugging. + // + // Defaults to 1 day. + google.protobuf.Duration catalog_transaction_prune_age = 19; + /// Once a partition hasn't received a write for this period of time, /// it will be compacted and, if set, persisted. Writers will generally /// have this amount of time to send late arriving writes or this could diff --git a/generated_types/src/database_rules/lifecycle.rs b/generated_types/src/database_rules/lifecycle.rs index 7b49d668de..4b3da38314 100644 --- a/generated_types/src/database_rules/lifecycle.rs +++ b/generated_types/src/database_rules/lifecycle.rs @@ -1,12 +1,12 @@ -use crate::google::FromFieldOpt; +use crate::google::{FieldViolationExt, FromFieldOpt}; use std::convert::{TryFrom, TryInto}; use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize}; use data_types::database_rules::{ LifecycleRules, MaxActiveCompactions, DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT, - DEFAULT_LATE_ARRIVE_WINDOW_SECONDS, DEFAULT_MUB_ROW_THRESHOLD, - DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS, DEFAULT_PERSIST_ROW_THRESHOLD, - DEFAULT_WORKER_BACKOFF_MILLIS, + DEFAULT_CATALOG_TRANSACTION_PRUNE_AGE, DEFAULT_LATE_ARRIVE_WINDOW_SECONDS, + DEFAULT_MUB_ROW_THRESHOLD, DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS, + DEFAULT_PERSIST_ROW_THRESHOLD, DEFAULT_WORKER_BACKOFF_MILLIS, }; use crate::google::FieldViolation; @@ -30,6 +30,7 @@ impl From for management::LifecycleRules { catalog_transactions_until_checkpoint: config .catalog_transactions_until_checkpoint .get(), + catalog_transaction_prune_age: Some(config.catalog_transaction_prune_age.into()), late_arrive_window_seconds: config.late_arrive_window_seconds.get(), persist_row_threshold: config.persist_row_threshold.get() as u64, persist_age_threshold_seconds: config.persist_age_threshold_seconds.get(), @@ -74,6 +75,10 @@ impl TryFrom for LifecycleRules { .unwrap_or_else(|| { NonZeroU64::new(DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT).unwrap() }), + catalog_transaction_prune_age: match proto.catalog_transaction_prune_age { + Some(d) => d.try_into().field("catalog_transaction_prune_age")?, + None => DEFAULT_CATALOG_TRANSACTION_PRUNE_AGE, + }, late_arrive_window_seconds: NonZeroU32::new(proto.late_arrive_window_seconds) .unwrap_or_else(|| NonZeroU32::new(DEFAULT_LATE_ARRIVE_WINDOW_SECONDS).unwrap()), persist_row_threshold: NonZeroUsize::new(proto.persist_row_threshold as usize) @@ -124,6 +129,10 @@ mod tests { management::lifecycle_rules::MaxActiveCompactionsCfg::MaxActiveCompactions(8), ), catalog_transactions_until_checkpoint: 10, + catalog_transaction_prune_age: Some(google_types::protobuf::Duration { + seconds: 11, + nanos: 22, + }), late_arrive_window_seconds: 23, persist_row_threshold: 57, persist_age_threshold_seconds: 23, @@ -152,6 +161,14 @@ mod tests { back.max_active_compactions_cfg, protobuf.max_active_compactions_cfg ); + assert_eq!( + back.catalog_transactions_until_checkpoint, + protobuf.catalog_transactions_until_checkpoint + ); + assert_eq!( + back.catalog_transaction_prune_age, + protobuf.catalog_transaction_prune_age + ); assert_eq!( back.late_arrive_window_seconds, protobuf.late_arrive_window_seconds diff --git a/server/src/db.rs b/server/src/db.rs index b79456b844..767da579e8 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -36,6 +36,7 @@ use observability_deps::tracing::{debug, error, info}; use parquet_file::catalog::{ api::{CatalogParquetInfo, CheckpointData, PreservedCatalog}, cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files}, + prune::prune_history as prune_catalog_transaction_history, }; use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows}; use query::{ @@ -894,23 +895,42 @@ impl Db { self.worker_iterations_cleanup .fetch_add(1, Ordering::Relaxed); + // read relevant parts of the db rules + let (avg_sleep_secs, catalog_transaction_prune_age) = { + let guard = self.rules.read(); + let avg_sleep_secs = guard.worker_cleanup_avg_sleep.as_secs_f32().max(1.0); + let catalog_transaction_prune_age = + guard.lifecycle_rules.catalog_transaction_prune_age; + (avg_sleep_secs, catalog_transaction_prune_age) + }; + // Sleep for a duration drawn from a poisson distribution to de-correlate workers. // Perform this sleep BEFORE the actual clean-up so that we don't immediately run a clean-up // on startup. - let avg_sleep_secs = self - .rules - .read() - .worker_cleanup_avg_sleep - .as_secs_f32() - .max(1.0); let dist = Poisson::new(avg_sleep_secs).expect("parameter should be positive and finite"); let duration = Duration::from_secs_f32(dist.sample(&mut rand::thread_rng())); debug!(?duration, "cleanup worker sleeps"); tokio::time::sleep(duration).await; + match chrono::Duration::from_std(catalog_transaction_prune_age) { + Ok(catalog_transaction_prune_age) => { + if let Err(e) = prune_catalog_transaction_history( + self.iox_object_store(), + Utc::now() - catalog_transaction_prune_age, + ) + .await + { + error!(%e, "error while pruning catalog transactions"); + } + } + Err(e) => { + error!(%e, "cannot convert `catalog_transaction_prune_age`, skipping transaction pruning"); + } + } + if let Err(e) = self.cleanup_unreferenced_parquet_files().await { - error!(%e, "error in background cleanup task"); + error!(%e, "error while cleaning unreferenced parquet files"); } } }; @@ -4086,6 +4106,70 @@ mod tests { write_lp(db.as_ref(), "cpu bar=1 10").await; } + #[tokio::test] + async fn transaction_pruning() { + // Test that the background worker prunes transactions + + // ==================== setup ==================== + let object_store = Arc::new(ObjectStore::new_in_memory()); + let server_id = ServerId::try_from(1).unwrap(); + let db_name = "transaction_pruning_test"; + + // ==================== do: create DB ==================== + // Create a DB given a server id, an object store and a db name + let test_db = TestDb::builder() + .object_store(Arc::clone(&object_store)) + .server_id(server_id) + .db_name(db_name) + .lifecycle_rules(LifecycleRules { + catalog_transactions_until_checkpoint: NonZeroU64::try_from(1).unwrap(), + catalog_transaction_prune_age: Duration::from_millis(1), + late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(), + ..Default::default() + }) + .build() + .await; + let db = Arc::new(test_db.db); + + // ==================== do: write data to parquet ==================== + create_parquet_chunk(&db).await; + + // ==================== do: start background task loop ==================== + let shutdown: CancellationToken = Default::default(); + let shutdown_captured = shutdown.clone(); + let db_captured = Arc::clone(&db); + let join_handle = + tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await }); + + // ==================== check: after a while the dropped file should be gone ==================== + let t_0 = Instant::now(); + loop { + let all_revisions = db + .iox_object_store() + .catalog_transaction_files() + .await + .unwrap() + .map_ok(|files| { + files + .into_iter() + .map(|f| f.revision_counter) + .collect::>() + }) + .try_concat() + .await + .unwrap(); + if !all_revisions.contains(&0) { + break; + } + assert!(t_0.elapsed() < Duration::from_secs(10)); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + // ==================== do: stop background task loop ==================== + shutdown.cancel(); + join_handle.await.unwrap(); + } + #[tokio::test] async fn table_wide_schema_enforcement() { // need a table with a partition template that uses a tag column, so that we can easily write to different partitions diff --git a/src/commands/database.rs b/src/commands/database.rs index 80cdb62147..6a5f355390 100644 --- a/src/commands/database.rs +++ b/src/commands/database.rs @@ -11,7 +11,10 @@ use influxdb_iox_client::{ }, write::{self, WriteError}, }; -use std::{convert::TryInto, fs::File, io::Read, num::NonZeroU64, path::PathBuf, str::FromStr}; +use std::{ + convert::TryInto, fs::File, io::Read, num::NonZeroU64, path::PathBuf, str::FromStr, + time::Duration, +}; use structopt::StructOpt; use thiserror::Error; @@ -98,6 +101,12 @@ struct Create { #[structopt(long, default_value = "100", parse(try_from_str))] catalog_transactions_until_checkpoint: NonZeroU64, + /// Prune catalog transactions older than the given age. + /// + /// Keeping old transaction can be useful for debugging. + #[structopt(long, default_value = "1d", parse(try_from_str = parse_duration::parse))] + catalog_transaction_prune_age: Duration, + /// Once a partition hasn't received a write for this period of time, /// it will be compacted and, if set, persisted. Writers will generally /// have this amount of time to send late arriving writes or this could @@ -206,6 +215,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { catalog_transactions_until_checkpoint: command .catalog_transactions_until_checkpoint .get(), + catalog_transaction_prune_age: Some( + command.catalog_transaction_prune_age.into(), + ), late_arrive_window_seconds: command.late_arrive_window_seconds, persist_row_threshold: command.persist_row_threshold, persist_age_threshold_seconds: command.persist_age_threshold_seconds, diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 622af0da37..634d7bae28 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -334,6 +334,10 @@ async fn test_create_get_update_delete_database() { lifecycle_rules: Some(LifecycleRules { buffer_size_hard: 553, catalog_transactions_until_checkpoint: 13, + catalog_transaction_prune_age: Some(generated_types::google::protobuf::Duration { + seconds: 11, + nanos: 22, + }), late_arrive_window_seconds: 423, worker_backoff_millis: 15, max_active_compactions_cfg: Some(