feat: prune old transactions from preserved catalog

pull/24376/head
Marco Neumann 2021-09-14 12:08:17 +02:00
parent 4769b67d14
commit 3f2e46c397
8 changed files with 145 additions and 12 deletions

1
Cargo.lock generated
View File

@ -1753,6 +1753,7 @@ dependencies = [
"parking_lot",
"parquet",
"parquet_file",
"parse_duration",
"pprof",
"predicates",
"prettytable-rs",

View File

@ -131,6 +131,7 @@ once_cell = { version = "1.4.0", features = ["parking_lot"] }
parking_lot = "0.11.2"
itertools = "0.10.1"
parquet = "5.3"
parse_duration = "2.1.1"
# used by arrow/datafusion anyway
prettytable-rs = "0.8"
pprof = { version = "^0.5", default-features = false, features = ["flamegraph", "protobuf"], optional = true }

View File

@ -183,6 +183,7 @@ impl Partitioner for DatabaseRules {
pub const DEFAULT_WORKER_BACKOFF_MILLIS: u64 = 1_000;
pub const DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT: u64 = 100;
pub const DEFAULT_CATALOG_TRANSACTION_PRUNE_AGE: Duration = Duration::from_secs(24 * 60 * 60);
pub const DEFAULT_MUB_ROW_THRESHOLD: usize = 100_000;
pub const DEFAULT_PERSIST_ROW_THRESHOLD: usize = 1_000_000;
pub const DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS: u32 = 30 * 60;
@ -215,6 +216,11 @@ pub struct LifecycleRules {
/// After how many transactions should IOx write a new checkpoint?
pub catalog_transactions_until_checkpoint: NonZeroU64,
/// Prune catalog transactions older than the given age.
///
/// Keeping old transaction can be useful for debugging.
pub catalog_transaction_prune_age: Duration,
/// Once a partition hasn't received a write for this period of time,
/// it will be compacted and, if set, persisted. Writers will generally
/// have this amount of time to send late arriving writes or this could
@ -301,6 +307,7 @@ impl Default for LifecycleRules {
DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT,
)
.unwrap(),
catalog_transaction_prune_age: DEFAULT_CATALOG_TRANSACTION_PRUNE_AGE,
late_arrive_window_seconds: NonZeroU32::new(DEFAULT_LATE_ARRIVE_WINDOW_SECONDS)
.unwrap(),
persist_row_threshold: NonZeroUsize::new(DEFAULT_PERSIST_ROW_THRESHOLD).unwrap(),

View File

@ -62,6 +62,13 @@ message LifecycleRules {
// If 0 / absent, this default to 100.
uint64 catalog_transactions_until_checkpoint = 11;
// Prune catalog transactions older than the given age.
//
// Keeping old transaction can be useful for debugging.
//
// Defaults to 1 day.
google.protobuf.Duration catalog_transaction_prune_age = 19;
/// Once a partition hasn't received a write for this period of time,
/// it will be compacted and, if set, persisted. Writers will generally
/// have this amount of time to send late arriving writes or this could

View File

@ -1,12 +1,12 @@
use crate::google::FromFieldOpt;
use crate::google::{FieldViolationExt, FromFieldOpt};
use std::convert::{TryFrom, TryInto};
use std::num::{NonZeroU32, NonZeroU64, NonZeroUsize};
use data_types::database_rules::{
LifecycleRules, MaxActiveCompactions, DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT,
DEFAULT_LATE_ARRIVE_WINDOW_SECONDS, DEFAULT_MUB_ROW_THRESHOLD,
DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS, DEFAULT_PERSIST_ROW_THRESHOLD,
DEFAULT_WORKER_BACKOFF_MILLIS,
DEFAULT_CATALOG_TRANSACTION_PRUNE_AGE, DEFAULT_LATE_ARRIVE_WINDOW_SECONDS,
DEFAULT_MUB_ROW_THRESHOLD, DEFAULT_PERSIST_AGE_THRESHOLD_SECONDS,
DEFAULT_PERSIST_ROW_THRESHOLD, DEFAULT_WORKER_BACKOFF_MILLIS,
};
use crate::google::FieldViolation;
@ -30,6 +30,7 @@ impl From<LifecycleRules> for management::LifecycleRules {
catalog_transactions_until_checkpoint: config
.catalog_transactions_until_checkpoint
.get(),
catalog_transaction_prune_age: Some(config.catalog_transaction_prune_age.into()),
late_arrive_window_seconds: config.late_arrive_window_seconds.get(),
persist_row_threshold: config.persist_row_threshold.get() as u64,
persist_age_threshold_seconds: config.persist_age_threshold_seconds.get(),
@ -74,6 +75,10 @@ impl TryFrom<management::LifecycleRules> for LifecycleRules {
.unwrap_or_else(|| {
NonZeroU64::new(DEFAULT_CATALOG_TRANSACTIONS_UNTIL_CHECKPOINT).unwrap()
}),
catalog_transaction_prune_age: match proto.catalog_transaction_prune_age {
Some(d) => d.try_into().field("catalog_transaction_prune_age")?,
None => DEFAULT_CATALOG_TRANSACTION_PRUNE_AGE,
},
late_arrive_window_seconds: NonZeroU32::new(proto.late_arrive_window_seconds)
.unwrap_or_else(|| NonZeroU32::new(DEFAULT_LATE_ARRIVE_WINDOW_SECONDS).unwrap()),
persist_row_threshold: NonZeroUsize::new(proto.persist_row_threshold as usize)
@ -124,6 +129,10 @@ mod tests {
management::lifecycle_rules::MaxActiveCompactionsCfg::MaxActiveCompactions(8),
),
catalog_transactions_until_checkpoint: 10,
catalog_transaction_prune_age: Some(google_types::protobuf::Duration {
seconds: 11,
nanos: 22,
}),
late_arrive_window_seconds: 23,
persist_row_threshold: 57,
persist_age_threshold_seconds: 23,
@ -152,6 +161,14 @@ mod tests {
back.max_active_compactions_cfg,
protobuf.max_active_compactions_cfg
);
assert_eq!(
back.catalog_transactions_until_checkpoint,
protobuf.catalog_transactions_until_checkpoint
);
assert_eq!(
back.catalog_transaction_prune_age,
protobuf.catalog_transaction_prune_age
);
assert_eq!(
back.late_arrive_window_seconds,
protobuf.late_arrive_window_seconds

View File

@ -36,6 +36,7 @@ use observability_deps::tracing::{debug, error, info};
use parquet_file::catalog::{
api::{CatalogParquetInfo, CheckpointData, PreservedCatalog},
cleanup::{delete_files as delete_parquet_files, get_unreferenced_parquet_files},
prune::prune_history as prune_catalog_transaction_history,
};
use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::PersistenceWindows};
use query::{
@ -894,23 +895,42 @@ impl Db {
self.worker_iterations_cleanup
.fetch_add(1, Ordering::Relaxed);
// read relevant parts of the db rules
let (avg_sleep_secs, catalog_transaction_prune_age) = {
let guard = self.rules.read();
let avg_sleep_secs = guard.worker_cleanup_avg_sleep.as_secs_f32().max(1.0);
let catalog_transaction_prune_age =
guard.lifecycle_rules.catalog_transaction_prune_age;
(avg_sleep_secs, catalog_transaction_prune_age)
};
// Sleep for a duration drawn from a poisson distribution to de-correlate workers.
// Perform this sleep BEFORE the actual clean-up so that we don't immediately run a clean-up
// on startup.
let avg_sleep_secs = self
.rules
.read()
.worker_cleanup_avg_sleep
.as_secs_f32()
.max(1.0);
let dist =
Poisson::new(avg_sleep_secs).expect("parameter should be positive and finite");
let duration = Duration::from_secs_f32(dist.sample(&mut rand::thread_rng()));
debug!(?duration, "cleanup worker sleeps");
tokio::time::sleep(duration).await;
match chrono::Duration::from_std(catalog_transaction_prune_age) {
Ok(catalog_transaction_prune_age) => {
if let Err(e) = prune_catalog_transaction_history(
self.iox_object_store(),
Utc::now() - catalog_transaction_prune_age,
)
.await
{
error!(%e, "error while pruning catalog transactions");
}
}
Err(e) => {
error!(%e, "cannot convert `catalog_transaction_prune_age`, skipping transaction pruning");
}
}
if let Err(e) = self.cleanup_unreferenced_parquet_files().await {
error!(%e, "error in background cleanup task");
error!(%e, "error while cleaning unreferenced parquet files");
}
}
};
@ -4086,6 +4106,70 @@ mod tests {
write_lp(db.as_ref(), "cpu bar=1 10").await;
}
#[tokio::test]
async fn transaction_pruning() {
// Test that the background worker prunes transactions
// ==================== setup ====================
let object_store = Arc::new(ObjectStore::new_in_memory());
let server_id = ServerId::try_from(1).unwrap();
let db_name = "transaction_pruning_test";
// ==================== do: create DB ====================
// Create a DB given a server id, an object store and a db name
let test_db = TestDb::builder()
.object_store(Arc::clone(&object_store))
.server_id(server_id)
.db_name(db_name)
.lifecycle_rules(LifecycleRules {
catalog_transactions_until_checkpoint: NonZeroU64::try_from(1).unwrap(),
catalog_transaction_prune_age: Duration::from_millis(1),
late_arrive_window_seconds: NonZeroU32::try_from(1).unwrap(),
..Default::default()
})
.build()
.await;
let db = Arc::new(test_db.db);
// ==================== do: write data to parquet ====================
create_parquet_chunk(&db).await;
// ==================== do: start background task loop ====================
let shutdown: CancellationToken = Default::default();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let join_handle =
tokio::spawn(async move { db_captured.background_worker(shutdown_captured).await });
// ==================== check: after a while the dropped file should be gone ====================
let t_0 = Instant::now();
loop {
let all_revisions = db
.iox_object_store()
.catalog_transaction_files()
.await
.unwrap()
.map_ok(|files| {
files
.into_iter()
.map(|f| f.revision_counter)
.collect::<Vec<u64>>()
})
.try_concat()
.await
.unwrap();
if !all_revisions.contains(&0) {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
tokio::time::sleep(Duration::from_millis(100)).await;
}
// ==================== do: stop background task loop ====================
shutdown.cancel();
join_handle.await.unwrap();
}
#[tokio::test]
async fn table_wide_schema_enforcement() {
// need a table with a partition template that uses a tag column, so that we can easily write to different partitions

View File

@ -11,7 +11,10 @@ use influxdb_iox_client::{
},
write::{self, WriteError},
};
use std::{convert::TryInto, fs::File, io::Read, num::NonZeroU64, path::PathBuf, str::FromStr};
use std::{
convert::TryInto, fs::File, io::Read, num::NonZeroU64, path::PathBuf, str::FromStr,
time::Duration,
};
use structopt::StructOpt;
use thiserror::Error;
@ -98,6 +101,12 @@ struct Create {
#[structopt(long, default_value = "100", parse(try_from_str))]
catalog_transactions_until_checkpoint: NonZeroU64,
/// Prune catalog transactions older than the given age.
///
/// Keeping old transaction can be useful for debugging.
#[structopt(long, default_value = "1d", parse(try_from_str = parse_duration::parse))]
catalog_transaction_prune_age: Duration,
/// Once a partition hasn't received a write for this period of time,
/// it will be compacted and, if set, persisted. Writers will generally
/// have this amount of time to send late arriving writes or this could
@ -206,6 +215,9 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
catalog_transactions_until_checkpoint: command
.catalog_transactions_until_checkpoint
.get(),
catalog_transaction_prune_age: Some(
command.catalog_transaction_prune_age.into(),
),
late_arrive_window_seconds: command.late_arrive_window_seconds,
persist_row_threshold: command.persist_row_threshold,
persist_age_threshold_seconds: command.persist_age_threshold_seconds,

View File

@ -334,6 +334,10 @@ async fn test_create_get_update_delete_database() {
lifecycle_rules: Some(LifecycleRules {
buffer_size_hard: 553,
catalog_transactions_until_checkpoint: 13,
catalog_transaction_prune_age: Some(generated_types::google::protobuf::Duration {
seconds: 11,
nanos: 22,
}),
late_arrive_window_seconds: 423,
worker_backoff_millis: 15,
max_active_compactions_cfg: Some(