diff --git a/Cargo.lock b/Cargo.lock index 75ef40f929..dbff294ff2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,9 +56,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56fc6cf8dc8c4158eed8649f9b8b0ea1518eb62b544fe9490d66fa0b349eafe9" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" [[package]] name = "android-tzdata" @@ -132,9 +132,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.71" +version = "1.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8" +checksum = "3b13c32d80ecc7ab747b80c3784bce54ee8a7a0cc4fbda9bf4cda2cf6fe90854" [[package]] name = "arrayref" @@ -450,9 +450,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0122885821398cc923ece939e24d1056a2384ee719432397fa9db87230ff11" +checksum = "62b74f44609f0f91493e3082d3734d98497e094777144380ea4db9f9905dd5b6" dependencies = [ "bzip2", "flate2", @@ -485,7 +485,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.27", ] [[package]] @@ -496,7 +496,7 @@ checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.27", ] [[package]] @@ -537,9 +537,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.18" +version = "0.6.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8175979259124331c1d7bf6586ee7e0da434155e4b2d48ec2c8386281d8df39" +checksum = "a6a1de45611fdb535bfde7b7de4fd54f4fd2b17b1737c0a59b69bf9b92074b8c" dependencies = [ "async-trait", "axum-core", @@ -650,9 +650,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.4.0" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "729b71f35bd3fa1a4c86b85d32c8b9069ea7fe14f7a53cfabb65f62d4265b888" +checksum = "199c42ab6972d92c9f8995f086273d25c42fc0f7b2a1fcefba465c1352d25ba5" dependencies = [ "arrayref", "arrayvec", @@ -694,13 +694,12 @@ dependencies = [ [[package]] name = "bstr" -version = "1.5.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a246e68bb43f6cd9db24bea052a53e40405417c5fb372e3d1a8a7f770a564ef5" +checksum = "6798148dccfbff0fae41c7574d2fa8f1ef3492fba0face179de5d8d447d67b05" dependencies = [ "memchr", - "once_cell", - "regex-automata 0.1.10", + "regex-automata 0.3.3", "serde", ] @@ -907,7 +906,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.27", ] [[package]] @@ -998,14 +997,21 @@ dependencies = [ name = "compactor_scheduler" version = "0.1.0" dependencies = [ + "assert_matches", "async-trait", "backoff", "data_types", + "futures", "iox_catalog", "iox_tests", "iox_time", + "itertools 0.11.0", + "metric", "observability_deps", + "parking_lot", "sharder", + "test_helpers", + "thiserror", "tokio", "uuid", "workspace-hack", @@ -1127,9 +1133,9 @@ dependencies = [ [[package]] name = "constant_time_eq" -version = "0.2.6" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21a53c0a4d288377e7415b53dcfc3c04da5cdc2cc95c8d5ac178b58f0b861ad6" +checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" [[package]] name = "core-foundation-sys" @@ -1148,9 +1154,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03e69e28e9f7f77debdedbaafa2866e1de9ba56df55a8bd7cfc724c25a09987c" +checksum = "a17b76ff3a4162b0b27f354a0c87015ddad39d35f9c0c36607a3bdd175dde1f1" dependencies = [ "libc", ] @@ -1327,12 +1333,12 @@ dependencies = [ [[package]] name = "dashmap" -version = "5.4.0" +version = "5.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +checksum = "6943ae99c34386c84a470c499d3414f66502a41340aa895406e0d2e4a207b91d" dependencies = [ "cfg-if", - "hashbrown 0.12.3", + "hashbrown 0.14.0", "lock_api", "once_cell", "parking_lot_core", @@ -1454,7 +1460,7 @@ dependencies = [ "lazy_static", "sqlparser 0.35.0", "strum 0.25.0", - "strum_macros 0.25.0", + "strum_macros 0.25.1", ] [[package]] @@ -1660,9 +1666,9 @@ dependencies = [ [[package]] name = "equivalent" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88bffebc5d80432c9b140ee17875ff173a8ab62faad5b257da912bd2f6c1c0a1" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" @@ -1917,7 +1923,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.27", ] [[package]] @@ -2271,9 +2277,9 @@ dependencies = [ [[package]] name = "http-range-header" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" +checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" [[package]] name = "httparse" @@ -2319,10 +2325,11 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.24.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0646026eb1b3eea4cd9ba47912ea5ce9cc07713d105b1a14698f4e6433d348b7" +checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" dependencies = [ + "futures-util", "http", "hyper", "rustls", @@ -3166,9 +3173,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.7" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0aa48fab2893d8a49caa94082ae8488f4e1050d73b367881dcd2198f4199fd8" +checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] name = "jobserver" @@ -3619,9 +3626,9 @@ dependencies = [ [[package]] name = "num" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606" +checksum = "b05180d69e3da0e530ba2a1dae5110317e49e3b7f3d41be227dc5f92e49ee7af" dependencies = [ "num-bigint", "num-complex", @@ -4026,9 +4033,9 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "pest" -version = "2.7.0" +version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f73935e4d55e2abf7f130186537b19e7a4abc886a0252380b59248af473a3fc9" +checksum = "0d2d1d55045829d65aad9d389139882ad623b33b904e7c9f1b10c5b8927298e5" dependencies = [ "thiserror", "ucd-trie", @@ -4036,9 +4043,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.7.0" +version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aef623c9bbfa0eedf5a0efba11a5ee83209c326653ca31ff019bec3a95bfff2b" +checksum = "5f94bca7e7a599d89dea5dfa309e217e7906c3c007fb9c3299c40b10d6a315d3" dependencies = [ "pest", "pest_generator", @@ -4046,22 +4053,22 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.7.0" +version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3e8cba4ec22bada7fc55ffe51e2deb6a0e0db2d0b7ab0b103acc80d2510c190" +checksum = "99d490fe7e8556575ff6911e45567ab95e71617f43781e5c05490dc8d75c965c" dependencies = [ "pest", "pest_meta", "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.27", ] [[package]] name = "pest_meta" -version = "2.7.0" +version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a01f71cb40bd8bb94232df14b946909e14660e33fc05db3e50ae2a82d7ea0ca0" +checksum = "2674c66ebb4b4d9036012091b537aae5878970d6999f81a265034d85b136b341" dependencies = [ "once_cell", "pest", @@ -4133,7 +4140,7 @@ checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.27", ] [[package]] @@ -4285,9 +4292,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.63" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b368fba921b0dce7e60f5e04ec15e565b3303972b42bcfde1d0713b881959eb" +checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" dependencies = [ "unicode-ident", ] @@ -4473,9 +4480,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.29" +version = "1.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105" +checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965" dependencies = [ "proc-macro2", ] @@ -4816,9 +4823,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.12" +version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" [[package]] name = "rustyline" @@ -4843,9 +4850,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.13" +version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" +checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" [[package]] name = "same-file" @@ -4886,15 +4893,15 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" +checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918" [[package]] name = "seq-macro" -version = "0.3.3" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6b44e8fc93a14e66336d230954dda83d18b4605ccace8fe09bc7514a71ad0bc" +checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" @@ -4913,7 +4920,7 @@ checksum = "b23f7ade6f110613c0d63858ddb8b94c1041f550eab58a16b371bdf2c9c80ab4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.27", ] [[package]] @@ -5563,9 +5570,9 @@ checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb" [[package]] name = "stringprep" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" +checksum = "db3737bde7edce97102e0e2b15365bf7a20bfdb5f60f4f9e8d7004258a51a8da" dependencies = [ "unicode-bidi", "unicode-normalization", @@ -5589,7 +5596,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" dependencies = [ - "strum_macros 0.25.0", + "strum_macros 0.25.1", ] [[package]] @@ -5607,15 +5614,15 @@ dependencies = [ [[package]] name = "strum_macros" -version = "0.25.0" +version = "0.25.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe9f3bd7d2e45dcc5e265fbb88d6513e4747d8ef9444cf01a533119bce28a157" +checksum = "6069ca09d878a33f883cc06aaa9718ede171841d3832450354410b718b097232" dependencies = [ "heck", "proc-macro2", "quote", "rustversion", - "syn 2.0.26", + "syn 2.0.27", ] [[package]] @@ -5626,9 +5633,9 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "symbolic-common" -version = "12.2.0" +version = "12.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38f7afd8bcd36190409e6b71d89928f7f09d918a7aa3460d847bc49a538d672e" +checksum = "167a4ffd7c35c143fd1030aa3c2caf76ba42220bd5a6b5f4781896434723b8c3" dependencies = [ "debugid", "memmap2", @@ -5638,9 +5645,9 @@ dependencies = [ [[package]] name = "symbolic-demangle" -version = "12.2.0" +version = "12.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec64922563a36e3fe686b6d99f06f25dacad2a202ac7502ed642930a188fb20a" +checksum = "e378c50e80686c1c5c205674e1f86a2858bec3d2a7dfdd690331a8a19330f293" dependencies = [ "cpp_demangle", "rustc-demangle", @@ -5660,9 +5667,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.26" +version = "2.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970" +checksum = "b60f673f44a8255b9c8c657daf66a596d435f2da81a555b06dc644d080ba45e0" dependencies = [ "proc-macro2", "quote", @@ -5785,7 +5792,7 @@ checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.27", ] [[package]] @@ -5914,7 +5921,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.27", ] [[package]] @@ -5986,9 +5993,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.19.12" +version = "0.19.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c500344a19072298cd05a7224b3c0c629348b78692bf48466c5238656e315a78" +checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a" dependencies = [ "indexmap 2.0.0", "serde", @@ -6187,7 +6194,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.27", ] [[package]] @@ -6302,9 +6309,9 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" [[package]] name = "ucd-trie" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e79c4d996edb816c91e4308506774452e55e95c3c9de07b6729e17e15a5ef81" +checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" [[package]] name = "unarray" @@ -6320,9 +6327,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" [[package]] name = "unicode-ident" -version = "1.0.9" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0" +checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c" [[package]] name = "unicode-normalization" @@ -6500,7 +6507,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.27", "wasm-bindgen-shared", ] @@ -6534,7 +6541,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.26", + "syn 2.0.27", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -6796,9 +6803,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] name = "winnow" -version = "0.4.7" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca0ace3845f0d96209f0375e6d367e3eb87eb65d27d445bdc9f1843a26f39448" +checksum = "25b5872fa2e10bd067ae946f927e726d7d603eaeb6e02fa6a350e0722d2b8c11" dependencies = [ "memchr", ] @@ -6890,7 +6897,7 @@ dependencies = [ "sqlx-postgres", "sqlx-sqlite", "syn 1.0.109", - "syn 2.0.26", + "syn 2.0.27", "thrift", "tokio", "tokio-stream", diff --git a/compactor/src/components/commit.rs b/compactor/src/components/commit.rs new file mode 100644 index 0000000000..6460306264 --- /dev/null +++ b/compactor/src/components/commit.rs @@ -0,0 +1,51 @@ +use std::sync::Arc; + +use compactor_scheduler::{ + CommitUpdate, CompactionJob, CompactionJobStatus, CompactionJobStatusResponse, + CompactionJobStatusVariant, Scheduler, +}; +use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId}; + +#[derive(Debug)] +pub struct CommitToScheduler { + scheduler: Arc, +} + +impl CommitToScheduler { + pub fn new(scheduler: Arc) -> Self { + Self { scheduler } + } + + pub async fn commit( + &self, + partition_id: PartitionId, + delete: &[ParquetFile], + upgrade: &[ParquetFile], + create: &[ParquetFileParams], + target_level: CompactionLevel, + ) -> Result, crate::DynError> { + match self + .scheduler + .update_job_status(CompactionJobStatus { + job: CompactionJob::new(partition_id), + status: CompactionJobStatusVariant::Update(CommitUpdate::new( + partition_id, + delete.into(), + upgrade.into(), + create.into(), + target_level, + )), + }) + .await? + { + CompactionJobStatusResponse::CreatedParquetFiles(ids) => Ok(ids), + CompactionJobStatusResponse::Ack => unreachable!("scheduler should not ack"), + } + } +} + +impl std::fmt::Display for CommitToScheduler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CommitToScheduler") + } +} diff --git a/compactor/src/components/hardcoded.rs b/compactor/src/components/hardcoded.rs index 78c44e1953..26ee183a38 100644 --- a/compactor/src/components/hardcoded.rs +++ b/compactor/src/components/hardcoded.rs @@ -12,11 +12,7 @@ use crate::{config::Config, error::ErrorKind, object_store::ignore_writes::Ignor use super::{ changed_files_filter::logging::LoggingChangedFiles, - combos::{throttle_partition::throttle_partition, unique_partitions::unique_partitions}, - commit::{ - catalog::CatalogCommit, logging::LoggingCommitWrapper, metrics::MetricsCommitWrapper, - mock::MockCommit, Commit, - }, + commit::CommitToScheduler, df_plan_exec::{ dedicated::DedicatedDataFusionPlanExec, noop::NoopDataFusionPlanExec, DataFusionPlanExec, }, @@ -39,9 +35,9 @@ use super::{ }, parquet_files_sink::{dispatch::DispatchParquetFilesSink, ParquetFilesSink}, partition_done_sink::{ - catalog::CatalogPartitionDoneSink, error_kind::ErrorKindPartitionDoneSinkWrapper, - logging::LoggingPartitionDoneSinkWrapper, metrics::MetricsPartitionDoneSinkWrapper, - mock::MockPartitionDoneSink, PartitionDoneSink, + error_kind::ErrorKindPartitionDoneSinkWrapper, logging::LoggingPartitionDoneSinkWrapper, + metrics::MetricsPartitionDoneSinkWrapper, outcome::PartitionDoneSinkToScheduler, + PartitionDoneSink, }, partition_files_source::{ catalog::{CatalogPartitionFilesSource, QueryRateLimiter}, @@ -93,6 +89,8 @@ pub fn hardcoded_components(config: &Config) -> Arc { config.scheduler_config.clone(), Arc::clone(&config.catalog), Arc::clone(&config.time_provider), + Arc::clone(&config.metric_registry), + config.shadow_mode, ); let (partitions_source, commit, partition_done_sink) = make_partitions_source_commit_partition_sink(config, Arc::clone(&scheduler)); @@ -123,52 +121,17 @@ fn make_partitions_source_commit_partition_sink( scheduler: Arc, ) -> ( Arc, - Arc, + Arc, Arc, ) { - let partitions_source = ScheduledPartitionsSource::new(scheduler); + let partitions_source = ScheduledPartitionsSource::new(Arc::clone(&scheduler)); - let partition_done_sink: Arc = if config.shadow_mode { - Arc::new(MockPartitionDoneSink::new()) - } else { - Arc::new(CatalogPartitionDoneSink::new( - config.backoff_config.clone(), - Arc::clone(&config.catalog), - )) - }; + let commit = CommitToScheduler::new(Arc::clone(&scheduler)); - let commit: Arc = if config.shadow_mode { - Arc::new(MockCommit::new()) - } else { - Arc::new(CatalogCommit::new( - config.backoff_config.clone(), - Arc::clone(&config.catalog), - )) - }; - - let commit = if let Some(commit_wrapper) = config.commit_wrapper.as_ref() { - commit_wrapper.wrap(commit) - } else { - commit - }; - - let (partitions_source, partition_done_sink) = - unique_partitions(partitions_source, partition_done_sink, 1); - - let (partitions_source, commit, partition_done_sink) = throttle_partition( - partitions_source, - commit, - partition_done_sink, - Arc::clone(&config.time_provider), - Duration::from_secs(60), - 1, - ); - - let commit = Arc::new(LoggingCommitWrapper::new(MetricsCommitWrapper::new( - commit, - &config.metric_registry, - ))); + let partition_done_sink = PartitionDoneSinkToScheduler::new(Arc::clone(&scheduler)); + // compactors are responsible for error classification + // and any future decisions regarding graceful shutdown let partition_done_sink: Arc = if config.all_errors_are_fatal { Arc::new(partition_done_sink) } else { @@ -185,6 +148,7 @@ fn make_partitions_source_commit_partition_sink( }) .copied() .collect(), + scheduler, )) }; let partition_done_sink = Arc::new(LoggingPartitionDoneSinkWrapper::new( @@ -210,7 +174,7 @@ fn make_partitions_source_commit_partition_sink( )) }; - (partitions_source, commit, partition_done_sink) + (partitions_source, Arc::new(commit), partition_done_sink) } fn make_partition_stream( diff --git a/compactor/src/components/mod.rs b/compactor/src/components/mod.rs index 1ce2eea844..97ebb8957a 100644 --- a/compactor/src/components/mod.rs +++ b/compactor/src/components/mod.rs @@ -1,9 +1,9 @@ use std::sync::Arc; use self::{ - changed_files_filter::ChangedFilesFilter, commit::Commit, df_plan_exec::DataFusionPlanExec, - df_planner::DataFusionPlanner, divide_initial::DivideInitial, file_classifier::FileClassifier, - ir_planner::IRPlanner, parquet_files_sink::ParquetFilesSink, + changed_files_filter::ChangedFilesFilter, commit::CommitToScheduler, + df_plan_exec::DataFusionPlanExec, df_planner::DataFusionPlanner, divide_initial::DivideInitial, + file_classifier::FileClassifier, ir_planner::IRPlanner, parquet_files_sink::ParquetFilesSink, partition_done_sink::PartitionDoneSink, partition_files_source::PartitionFilesSource, partition_filter::PartitionFilter, partition_info_source::PartitionInfoSource, partition_stream::PartitionStream, @@ -12,8 +12,7 @@ use self::{ }; pub mod changed_files_filter; -pub mod combos; -pub mod commit; +pub(crate) mod commit; pub mod df_plan_exec; pub mod df_planner; pub mod divide_initial; @@ -61,8 +60,8 @@ pub struct Components { pub post_classification_partition_filter: Arc, /// Records "partition is done" status for given partition. pub partition_done_sink: Arc, - /// Commits changes (i.e. deletion and creation) to the catalog. - pub commit: Arc, + /// Commits changes (i.e. deletion and creation). + pub commit: Arc, /// Creates `PlanIR` that describes what files should be compacted and updated pub ir_planner: Arc, /// Creates an Execution plan for a `PlanIR` diff --git a/compactor/src/components/partition_done_sink/error_kind.rs b/compactor/src/components/partition_done_sink/error_kind.rs index 0848906215..23d938d2cd 100644 --- a/compactor/src/components/partition_done_sink/error_kind.rs +++ b/compactor/src/components/partition_done_sink/error_kind.rs @@ -1,6 +1,10 @@ -use std::{collections::HashSet, fmt::Display}; +use std::{collections::HashSet, fmt::Display, sync::Arc}; use async_trait::async_trait; +use compactor_scheduler::{ + CompactionJob, CompactionJobStatus, CompactionJobStatusResponse, CompactionJobStatusVariant, + ErrorKind as SchedulerErrorKind, Scheduler, +}; use data_types::PartitionId; use crate::error::{DynError, ErrorKind, ErrorKindExt}; @@ -14,14 +18,19 @@ where { kind: HashSet, inner: T, + scheduler: Arc, } impl ErrorKindPartitionDoneSinkWrapper where T: PartitionDoneSink, { - pub fn new(inner: T, kind: HashSet) -> Self { - Self { kind, inner } + pub fn new(inner: T, kind: HashSet, scheduler: Arc) -> Self { + Self { + kind, + inner, + scheduler, + } } } @@ -41,13 +50,42 @@ impl PartitionDoneSink for ErrorKindPartitionDoneSinkWrapper where T: PartitionDoneSink, { - async fn record(&self, partition: PartitionId, res: Result<(), DynError>) { + async fn record( + &self, + partition: PartitionId, + res: Result<(), DynError>, + ) -> Result<(), DynError> { match res { Ok(()) => self.inner.record(partition, Ok(())).await, Err(e) if self.kind.contains(&e.classify()) => { - self.inner.record(partition, Err(e)).await; + let scheduler_error = match SchedulerErrorKind::from(e.classify()) { + SchedulerErrorKind::OutOfMemory => SchedulerErrorKind::OutOfMemory, + SchedulerErrorKind::ObjectStore => SchedulerErrorKind::ObjectStore, + SchedulerErrorKind::Timeout => SchedulerErrorKind::Timeout, + SchedulerErrorKind::Unknown(_) => SchedulerErrorKind::Unknown(e.to_string()), + }; + + match self + .scheduler + .update_job_status(CompactionJobStatus { + job: CompactionJob::new(partition), + status: CompactionJobStatusVariant::Error(scheduler_error), + }) + .await? + { + CompactionJobStatusResponse::Ack => {} + CompactionJobStatusResponse::CreatedParquetFiles(_) => { + unreachable!("scheduler should not created parquet files") + } + } + + self.inner.record(partition, Err(e)).await + } + Err(e) => { + // contract of this abstraction, + // where we do not pass to `self.inner` if not in `self.kind` + Err(e) } - _ => {} } } } @@ -56,18 +94,24 @@ where mod tests { use std::{collections::HashMap, sync::Arc}; - use crate::components::partition_done_sink::mock::MockPartitionDoneSink; - + use compactor_scheduler::create_test_scheduler; use datafusion::error::DataFusionError; + use iox_tests::TestCatalog; + use iox_time::{MockProvider, Time}; use object_store::Error as ObjectStoreError; - use super::*; + use super::{super::mock::MockPartitionDoneSink, *}; #[test] fn test_display() { let sink = ErrorKindPartitionDoneSinkWrapper::new( MockPartitionDoneSink::new(), HashSet::from([ErrorKind::ObjectStore, ErrorKind::OutOfMemory]), + create_test_scheduler( + TestCatalog::new().catalog(), + Arc::new(MockProvider::new(Time::MIN)), + None, + ), ); assert_eq!(sink.to_string(), "kind([ObjectStore, OutOfMemory], mock)"); } @@ -78,22 +122,33 @@ mod tests { let sink = ErrorKindPartitionDoneSinkWrapper::new( Arc::clone(&inner), HashSet::from([ErrorKind::ObjectStore, ErrorKind::OutOfMemory]), + create_test_scheduler( + TestCatalog::new().catalog(), + Arc::new(MockProvider::new(Time::MIN)), + None, + ), ); sink.record( PartitionId::new(1), Err(Box::new(ObjectStoreError::NotImplemented)), ) - .await; + .await + .expect("record failed"); sink.record( PartitionId::new(2), Err(Box::new(DataFusionError::ResourcesExhausted(String::from( "foo", )))), ) - .await; - sink.record(PartitionId::new(3), Err("foo".into())).await; - sink.record(PartitionId::new(4), Ok(())).await; + .await + .expect("record failed"); + sink.record(PartitionId::new(3), Err("foo".into())) + .await + .unwrap_err(); + sink.record(PartitionId::new(4), Ok(())) + .await + .expect("record failed"); assert_eq!( inner.results(), diff --git a/compactor/src/components/partition_done_sink/logging.rs b/compactor/src/components/partition_done_sink/logging.rs index d4649567dc..80e837c978 100644 --- a/compactor/src/components/partition_done_sink/logging.rs +++ b/compactor/src/components/partition_done_sink/logging.rs @@ -39,12 +39,17 @@ impl PartitionDoneSink for LoggingPartitionDoneSinkWrapper where T: PartitionDoneSink, { - async fn record(&self, partition: PartitionId, res: Result<(), DynError>) { + async fn record( + &self, + partition: PartitionId, + res: Result<(), DynError>, + ) -> Result<(), DynError> { match &res { Ok(()) => { info!(partition_id = partition.get(), "Finished partition",); } Err(e) => { + // log compactor errors, classified by compactor ErrorKind error!( %e, kind=e.classify().name(), @@ -53,7 +58,7 @@ where ); } } - self.inner.record(partition, res).await; + self.inner.record(partition, res).await } } @@ -64,9 +69,7 @@ mod tests { use object_store::Error as ObjectStoreError; use test_helpers::tracing::TracingCapture; - use crate::components::partition_done_sink::mock::MockPartitionDoneSink; - - use super::*; + use super::{super::mock::MockPartitionDoneSink, *}; #[test] fn test_display() { @@ -81,14 +84,21 @@ mod tests { let capture = TracingCapture::new(); - sink.record(PartitionId::new(1), Err("msg 1".into())).await; - sink.record(PartitionId::new(2), Err("msg 2".into())).await; + sink.record(PartitionId::new(1), Err("msg 1".into())) + .await + .expect("record failed"); + sink.record(PartitionId::new(2), Err("msg 2".into())) + .await + .expect("record failed"); sink.record( PartitionId::new(1), Err(Box::new(ObjectStoreError::NotImplemented)), ) - .await; - sink.record(PartitionId::new(3), Ok(())).await; + .await + .expect("record failed"); + sink.record(PartitionId::new(3), Ok(())) + .await + .expect("record failed"); assert_eq!( capture.to_string(), diff --git a/compactor/src/components/partition_done_sink/metrics.rs b/compactor/src/components/partition_done_sink/metrics.rs index 1999e8e0f4..10cb99860c 100644 --- a/compactor/src/components/partition_done_sink/metrics.rs +++ b/compactor/src/components/partition_done_sink/metrics.rs @@ -62,12 +62,17 @@ impl PartitionDoneSink for MetricsPartitionDoneSinkWrapper where T: PartitionDoneSink, { - async fn record(&self, partition: PartitionId, res: Result<(), DynError>) { + async fn record( + &self, + partition: PartitionId, + res: Result<(), DynError>, + ) -> Result<(), DynError> { match &res { Ok(()) => { self.ok_counter.inc(1); } Err(e) => { + // classify and track counts of compactor ErrorKind let kind = e.classify(); self.error_counter .get(&kind) @@ -75,7 +80,7 @@ where .inc(1); } } - self.inner.record(partition, res).await; + self.inner.record(partition, res).await } } @@ -86,9 +91,7 @@ mod tests { use metric::{assert_counter, Attributes}; use object_store::Error as ObjectStoreError; - use crate::components::partition_done_sink::mock::MockPartitionDoneSink; - - use super::*; + use super::{super::mock::MockPartitionDoneSink, *}; #[test] fn test_display() { @@ -107,14 +110,21 @@ mod tests { assert_error_counter(®istry, "unknown", 0); assert_error_counter(®istry, "object_store", 0); - sink.record(PartitionId::new(1), Err("msg 1".into())).await; - sink.record(PartitionId::new(2), Err("msg 2".into())).await; + sink.record(PartitionId::new(1), Err("msg 1".into())) + .await + .expect("record failed"); + sink.record(PartitionId::new(2), Err("msg 2".into())) + .await + .expect("record failed"); sink.record( PartitionId::new(1), Err(Box::new(ObjectStoreError::NotImplemented)), ) - .await; - sink.record(PartitionId::new(3), Ok(())).await; + .await + .expect("record failed"); + sink.record(PartitionId::new(3), Ok(())) + .await + .expect("record failed"); assert_ok_counter(®istry, 1); assert_error_counter(®istry, "unknown", 2); diff --git a/compactor/src/components/partition_done_sink/mock.rs b/compactor/src/components/partition_done_sink/mock.rs index b906a8b520..bce461ef5e 100644 --- a/compactor/src/components/partition_done_sink/mock.rs +++ b/compactor/src/components/partition_done_sink/mock.rs @@ -3,21 +3,23 @@ use std::{collections::HashMap, fmt::Display, sync::Mutex}; use async_trait::async_trait; use data_types::PartitionId; -use crate::error::DynError; - -use super::PartitionDoneSink; +use super::{DynError, PartitionDoneSink}; +/// Mock for [`PartitionDoneSink`]. #[derive(Debug, Default)] pub struct MockPartitionDoneSink { last: Mutex>>, } impl MockPartitionDoneSink { + /// Create new mock. + #[allow(dead_code)] // used for testing pub fn new() -> Self { Self::default() } - #[allow(dead_code)] // not used anywhere + /// Get the last recorded results. + #[allow(dead_code)] // used for testing pub fn results(&self) -> HashMap> { self.last.lock().expect("not poisoned").clone() } @@ -31,11 +33,16 @@ impl Display for MockPartitionDoneSink { #[async_trait] impl PartitionDoneSink for MockPartitionDoneSink { - async fn record(&self, partition: PartitionId, res: Result<(), DynError>) { + async fn record( + &self, + partition: PartitionId, + res: Result<(), DynError>, + ) -> Result<(), DynError> { self.last .lock() .expect("not poisoned") .insert(partition, res.map_err(|e| e.to_string())); + Ok(()) } } @@ -54,10 +61,18 @@ mod tests { assert_eq!(sink.results(), HashMap::default(),); - sink.record(PartitionId::new(1), Err("msg 1".into())).await; - sink.record(PartitionId::new(2), Err("msg 2".into())).await; - sink.record(PartitionId::new(1), Err("msg 3".into())).await; - sink.record(PartitionId::new(3), Ok(())).await; + sink.record(PartitionId::new(1), Err("msg 1".into())) + .await + .expect("record failed"); + sink.record(PartitionId::new(2), Err("msg 2".into())) + .await + .expect("record failed"); + sink.record(PartitionId::new(1), Err("msg 3".into())) + .await + .expect("record failed"); + sink.record(PartitionId::new(3), Ok(())) + .await + .expect("record failed"); assert_eq!( sink.results(), diff --git a/compactor/src/components/partition_done_sink/mod.rs b/compactor/src/components/partition_done_sink/mod.rs index 40d1772222..ecd4573ac4 100644 --- a/compactor/src/components/partition_done_sink/mod.rs +++ b/compactor/src/components/partition_done_sink/mod.rs @@ -6,13 +6,13 @@ use std::{ use async_trait::async_trait; use data_types::PartitionId; -use crate::error::DynError; +use crate::DynError; -pub mod catalog; pub mod error_kind; pub mod logging; pub mod metrics; pub mod mock; +pub mod outcome; /// Records "partition is done" status for given partition. #[async_trait] @@ -20,7 +20,11 @@ pub trait PartitionDoneSink: Debug + Display + Send + Sync { /// Record "partition is done" status for given partition. /// /// This method should retry. - async fn record(&self, partition: PartitionId, res: Result<(), DynError>); + async fn record( + &self, + partition: PartitionId, + res: Result<(), DynError>, + ) -> Result<(), DynError>; } #[async_trait] @@ -28,7 +32,11 @@ impl PartitionDoneSink for Arc where T: PartitionDoneSink + ?Sized, { - async fn record(&self, partition: PartitionId, res: Result<(), DynError>) { + async fn record( + &self, + partition: PartitionId, + res: Result<(), DynError>, + ) -> Result<(), DynError> { self.as_ref().record(partition, res).await } } diff --git a/compactor/src/components/partition_done_sink/outcome.rs b/compactor/src/components/partition_done_sink/outcome.rs new file mode 100644 index 0000000000..6f7a9e8ca1 --- /dev/null +++ b/compactor/src/components/partition_done_sink/outcome.rs @@ -0,0 +1,47 @@ +use std::{fmt::Display, sync::Arc}; + +use async_trait::async_trait; +use compactor_scheduler::{ + CompactionJob, CompactionJobEnd, CompactionJobEndVariant, Scheduler, SkipReason, +}; +use data_types::PartitionId; + +use crate::DynError; + +use super::PartitionDoneSink; + +#[derive(Debug)] +pub struct PartitionDoneSinkToScheduler { + scheduler: Arc, +} + +impl PartitionDoneSinkToScheduler { + pub fn new(scheduler: Arc) -> Self { + Self { scheduler } + } +} + +impl Display for PartitionDoneSinkToScheduler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "PartitionDoneSinkToScheduler") + } +} + +#[async_trait] +impl PartitionDoneSink for PartitionDoneSinkToScheduler { + async fn record( + &self, + partition: PartitionId, + res: Result<(), DynError>, + ) -> Result<(), DynError> { + let end_action = CompactionJobEnd { + job: CompactionJob::new(partition), + end_action: match res { + Ok(_) => CompactionJobEndVariant::Complete, + Err(e) => CompactionJobEndVariant::RequestToSkip(SkipReason(e.to_string())), + }, + }; + + self.scheduler.end_job(end_action).await + } +} diff --git a/compactor/src/components/report.rs b/compactor/src/components/report.rs index b47d67af55..b790a1c760 100644 --- a/compactor/src/components/report.rs +++ b/compactor/src/components/report.rs @@ -34,7 +34,6 @@ pub fn log_config(config: &Config) { min_num_l1_files_to_compact, process_once, parquet_files_sink_override, - commit_wrapper, simulate_without_object_store, all_errors_are_fatal, max_num_columns_per_table, @@ -47,8 +46,6 @@ pub fn log_config(config: &Config) { .map(|_| "Some") .unwrap_or("None"); - let commit_wrapper = commit_wrapper.as_ref().map(|_| "Some").unwrap_or("None"); - info!( %catalog, %scheduler_config, @@ -71,7 +68,6 @@ pub fn log_config(config: &Config) { process_once, simulate_without_object_store, %parquet_files_sink_override, - %commit_wrapper, all_errors_are_fatal, max_num_columns_per_table, max_num_files_per_plan, diff --git a/compactor/src/config.rs b/compactor/src/config.rs index 0d977fd56a..e57c8e35a3 100644 --- a/compactor/src/config.rs +++ b/compactor/src/config.rs @@ -8,7 +8,7 @@ use iox_query::exec::Executor; use iox_time::TimeProvider; use parquet_file::storage::ParquetStorage; -use crate::components::{commit::CommitWrapper, parquet_files_sink::ParquetFilesSink}; +use crate::components::parquet_files_sink::ParquetFilesSink; /// Multiple from `max_desired_file_size_bytes` to compute the minimum value for /// `max_compact_size_bytes`. Since `max_desired_file_size_bytes` is softly enforced, actual file @@ -122,11 +122,6 @@ pub struct Config { /// (used for testing) pub parquet_files_sink_override: Option>, - /// Optionally wrap the `Commit` instance - /// - /// This is mostly used for testing - pub commit_wrapper: Option>, - /// Ensure that ALL errors (including object store errors) result in "skipped" partitions. /// /// This is mostly useful for testing. diff --git a/compactor/src/driver.rs b/compactor/src/driver.rs index d50bf21509..4bb73f4f0a 100644 --- a/compactor/src/driver.rs +++ b/compactor/src/driver.rs @@ -82,7 +82,7 @@ async fn compact_partition( scratchpad, transmit_progress_signal, ) - .await + .await // errors detected in the CompactionJob update_job_status(), will be handled in the timeout_with_progress_checking } }) .await; @@ -104,7 +104,9 @@ async fn compact_partition( // to the `skipped_compactions` table or not. TimeoutWithProgress::Completed(res) => res, }; - components + + // TODO: how handle errors detected in the CompactionJob ending actions? + let _ = components .partition_done_sink .record(partition_id, res) .await; @@ -423,7 +425,7 @@ async fn execute_branch( created_file_params, target_level, ) - .await; + .await?; // we only need to upgrade files on the first iteration, so empty the upgrade list for next loop. upgrade = Vec::new(); @@ -631,7 +633,7 @@ async fn update_catalog( files_to_upgrade: Vec, file_params_to_create: Vec, target_level: CompactionLevel, -) -> (Vec, Vec) { +) -> Result<(Vec, Vec), DynError> { let current_parquet_file_state = fetch_and_save_parquet_file_state(&components, partition_id).await; @@ -649,7 +651,7 @@ async fn update_catalog( &file_params_to_create, target_level, ) - .await; + .await?; // Update created ids to their corresponding file params let created_file_params = file_params_to_create @@ -667,7 +669,7 @@ async fn update_catalog( }) .collect::>(); - (created_file_params, upgraded_files) + Ok((created_file_params, upgraded_files)) } // SINGLE_THREADED_COLUMN_COUNT is the number of columns requiring a partition be compacted single threaded. diff --git a/compactor/src/error.rs b/compactor/src/error.rs index 3ae7be1b2b..a1d687165b 100644 --- a/compactor/src/error.rs +++ b/compactor/src/error.rs @@ -1,5 +1,6 @@ //! Error handling. +use compactor_scheduler::ErrorKind as SchedulerErrorKind; use datafusion::{arrow::error::ArrowError, error::DataFusionError, parquet::errors::ParquetError}; use object_store::Error as ObjectStoreError; use std::{error::Error, fmt::Display, sync::Arc}; @@ -51,6 +52,17 @@ impl ErrorKind { } } +impl From for SchedulerErrorKind { + fn from(e: ErrorKind) -> Self { + match e { + ErrorKind::ObjectStore => Self::ObjectStore, + ErrorKind::OutOfMemory => Self::OutOfMemory, + ErrorKind::Timeout => Self::Timeout, + ErrorKind::Unknown => Self::Unknown("".into()), + } + } +} + impl Display for ErrorKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.name()) diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index 711f6118b4..2e46324577 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -221,12 +221,8 @@ mod round_info; // publically expose items needed for testing pub use components::{ - commit::{Commit, CommitWrapper}, - df_planner::panic::PanicDataFusionPlanner, - hardcoded::hardcoded_components, - namespaces_source::mock::NamespaceWrapper, - parquet_files_sink::ParquetFilesSink, - Components, + df_planner::panic::PanicDataFusionPlanner, hardcoded::hardcoded_components, + namespaces_source::mock::NamespaceWrapper, parquet_files_sink::ParquetFilesSink, Components, }; pub use driver::compact; pub use error::DynError; diff --git a/compactor_scheduler/Cargo.toml b/compactor_scheduler/Cargo.toml index 1e816ca951..e0250819a4 100644 --- a/compactor_scheduler/Cargo.toml +++ b/compactor_scheduler/Cargo.toml @@ -9,13 +9,20 @@ license.workspace = true async-trait = "0.1.72" backoff = { path = "../backoff" } data_types = { path = "../data_types" } +futures = "0.3" iox_catalog = { path = "../iox_catalog" } iox_time = { path = "../iox_time" } +itertools = "0.11.0" +metric = { path = "../metric" } observability_deps = { path = "../observability_deps" } +parking_lot = "0.12.1" sharder = { path = "../sharder" } +thiserror = "1.0" uuid = { version = "1", features = ["v4"] } workspace-hack = { version = "0.1", path = "../workspace-hack" } [dev-dependencies] +assert_matches = "1.5.0" iox_tests = { path = "../iox_tests" } +test_helpers = { path = "../test_helpers"} tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } diff --git a/compactor/src/components/commit/logging.rs b/compactor_scheduler/src/commit/logging.rs similarity index 92% rename from compactor/src/components/commit/logging.rs rename to compactor_scheduler/src/commit/logging.rs index 0bd1789372..439e928d45 100644 --- a/compactor/src/components/commit/logging.rs +++ b/compactor_scheduler/src/commit/logging.rs @@ -4,10 +4,10 @@ use async_trait::async_trait; use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId}; use observability_deps::tracing::info; -use super::Commit; +use super::{Commit, Error}; #[derive(Debug)] -pub struct LoggingCommitWrapper +pub(crate) struct LoggingCommitWrapper where T: Commit, { @@ -18,7 +18,7 @@ impl LoggingCommitWrapper where T: Commit, { - pub fn new(inner: T) -> Self { + pub(crate) fn new(inner: T) -> Self { Self { inner } } } @@ -44,12 +44,12 @@ where upgrade: &[ParquetFile], create: &[ParquetFileParams], target_level: CompactionLevel, - ) -> Vec { + ) -> Result, Error> { // Perform commit first and report status AFTERWARDS. let created = self .inner .commit(partition_id, delete, upgrade, create, target_level) - .await; + .await?; // Log numbers BEFORE IDs because the list may be so long that we hit the line-length limit. In this case we at // least have the important information. Note that the message always is printed first, so we'll never loose @@ -72,7 +72,7 @@ where "committed parquet file change", ); - created + Ok(created) } } @@ -80,10 +80,11 @@ where mod tests { use std::sync::Arc; + use assert_matches::assert_matches; use test_helpers::tracing::TracingCapture; use super::*; - use crate::components::commit::mock::{CommitHistoryEntry, MockCommit}; + use crate::commit::mock::{CommitHistoryEntry, MockCommit}; use iox_tests::ParquetFileBuilder; #[test] @@ -124,9 +125,9 @@ mod tests { CompactionLevel::Final, ) .await; - assert_eq!( + assert_matches!( ids, - vec![ParquetFileId::new(1000), ParquetFileId::new(1001)] + Ok(res) if res == vec![ParquetFileId::new(1000), ParquetFileId::new(1001)] ); let ids = commit @@ -138,7 +139,7 @@ mod tests { CompactionLevel::Final, ) .await; - assert_eq!(ids, vec![]); + assert_matches!(ids, Ok(res) if res == vec![]); assert_eq!( capture.to_string(), diff --git a/compactor/src/components/commit/metrics.rs b/compactor_scheduler/src/commit/metrics.rs similarity index 96% rename from compactor/src/components/commit/metrics.rs rename to compactor_scheduler/src/commit/metrics.rs index 4a63cec426..0f0b73d2cc 100644 --- a/compactor/src/components/commit/metrics.rs +++ b/compactor_scheduler/src/commit/metrics.rs @@ -5,7 +5,7 @@ use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, use itertools::Itertools; use metric::{Registry, U64Histogram, U64HistogramOptions}; -use super::Commit; +use super::{Commit, Error}; #[derive(Debug, Clone, Copy)] enum HistogramType { @@ -102,7 +102,7 @@ impl Histogram { } #[derive(Debug)] -pub struct MetricsCommitWrapper +pub(crate) struct MetricsCommitWrapper where T: Commit, { @@ -124,7 +124,7 @@ impl MetricsCommitWrapper where T: Commit, { - pub fn new(inner: T, registry: &Registry) -> Self { + pub(crate) fn new(inner: T, registry: &Registry) -> Self { Self { file_bytes: Histogram::new( registry, @@ -182,12 +182,12 @@ where upgrade: &[ParquetFile], create: &[ParquetFileParams], target_level: CompactionLevel, - ) -> Vec { + ) -> Result, Error> { // Perform commit first and report status AFTERWARDS. let ids = self .inner .commit(partition_id, delete, upgrade, create, target_level) - .await; + .await?; // per file metrics for f in create { @@ -297,7 +297,7 @@ where .record(upgrade.iter().map(|f| f.row_count as u64).sum::()); } - ids + Ok(ids) } } @@ -305,9 +305,10 @@ where mod tests { use std::sync::Arc; + use assert_matches::assert_matches; use metric::{assert_histogram, Attributes}; - use crate::components::commit::mock::{CommitHistoryEntry, MockCommit}; + use crate::commit::mock::{CommitHistoryEntry, MockCommit}; use iox_tests::ParquetFileBuilder; use super::*; @@ -398,7 +399,7 @@ mod tests { CompactionLevel::FileNonOverlapped, ) .await; - assert_eq!(ids, vec![ParquetFileId::new(1000)]); + assert_matches!(ids, Ok(res) if res == vec![ParquetFileId::new(1000)]); let ids = commit .commit( @@ -409,7 +410,7 @@ mod tests { CompactionLevel::Final, ) .await; - assert_eq!(ids, vec![]); + assert_matches!(ids, Ok(res) if res == vec![]); assert_histogram!( registry, diff --git a/compactor/src/components/commit/mock.rs b/compactor_scheduler/src/commit/mock.rs similarity index 79% rename from compactor/src/components/commit/mock.rs rename to compactor_scheduler/src/commit/mock.rs index 17a68c33b8..a2e6e43631 100644 --- a/compactor/src/components/commit/mock.rs +++ b/compactor_scheduler/src/commit/mock.rs @@ -1,34 +1,32 @@ use std::{ fmt::Display, - sync::{ - atomic::{AtomicI64, Ordering}, - Mutex, - }, + sync::atomic::{AtomicI64, Ordering}, }; use async_trait::async_trait; use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId}; +use parking_lot::Mutex; -use super::Commit; +use super::{Commit, Error}; #[derive(Debug, PartialEq, Eq, Clone)] -pub struct CommitHistoryEntry { - pub partition_id: PartitionId, - pub delete: Vec, - pub upgrade: Vec, - pub created: Vec, - pub target_level: CompactionLevel, +pub(crate) struct CommitHistoryEntry { + pub(crate) partition_id: PartitionId, + pub(crate) delete: Vec, + pub(crate) upgrade: Vec, + pub(crate) created: Vec, + pub(crate) target_level: CompactionLevel, } -#[derive(Debug)] -pub struct MockCommit { +#[derive(Debug, Default)] +pub(crate) struct MockCommit { history: Mutex>, id_counter: AtomicI64, } impl MockCommit { #[allow(dead_code)] // not used anywhere - pub fn new() -> Self { + pub(crate) fn new() -> Self { Self { history: Default::default(), id_counter: AtomicI64::new(1000), @@ -36,8 +34,8 @@ impl MockCommit { } #[allow(dead_code)] // not used anywhere - pub fn history(&self) -> Vec { - self.history.lock().expect("not poisoned").clone() + pub(crate) fn history(&self) -> Vec { + self.history.lock().clone() } } @@ -56,7 +54,7 @@ impl Commit for MockCommit { upgrade: &[ParquetFile], create: &[ParquetFileParams], target_level: CompactionLevel, - ) -> Vec { + ) -> Result, Error> { let (created, ids): (Vec<_>, Vec<_>) = create .iter() .map(|params| { @@ -66,23 +64,21 @@ impl Commit for MockCommit { }) .unzip(); - self.history - .lock() - .expect("not poisoned") - .push(CommitHistoryEntry { - partition_id, - delete: delete.to_vec(), - upgrade: upgrade.to_vec(), - created, - target_level, - }); + self.history.lock().push(CommitHistoryEntry { + partition_id, + delete: delete.to_vec(), + upgrade: upgrade.to_vec(), + created, + target_level, + }); - ids + Ok(ids) } } #[cfg(test)] mod tests { + use assert_matches::assert_matches; use iox_tests::ParquetFileBuilder; use super::*; @@ -119,9 +115,9 @@ mod tests { CompactionLevel::FileNonOverlapped, ) .await; - assert_eq!( + assert_matches!( ids, - vec![ParquetFileId::new(1000), ParquetFileId::new(1001)] + Ok(res) if res == vec![ParquetFileId::new(1000), ParquetFileId::new(1001)] ); let ids = commit @@ -133,7 +129,10 @@ mod tests { CompactionLevel::Final, ) .await; - assert_eq!(ids, vec![ParquetFileId::new(1002)]); + assert_matches!( + ids, + Ok(res) if res == vec![ParquetFileId::new(1002)] + ); let ids = commit .commit( @@ -144,7 +143,10 @@ mod tests { CompactionLevel::FileNonOverlapped, ) .await; - assert_eq!(ids, vec![ParquetFileId::new(1003)]); + assert_matches!( + ids, + Ok(res) if res == vec![ParquetFileId::new(1003)] + ); // simulate fill implosion of the file (this may happen w/ delete predicates) let ids = commit @@ -156,7 +158,10 @@ mod tests { CompactionLevel::FileNonOverlapped, ) .await; - assert_eq!(ids, vec![]); + assert_matches!( + ids, + Ok(res) if res == vec![] + ); assert_eq!( commit.history(), diff --git a/compactor/src/components/commit/mod.rs b/compactor_scheduler/src/commit/mod.rs similarity index 68% rename from compactor/src/components/commit/mod.rs rename to compactor_scheduler/src/commit/mod.rs index de777ccefd..64ddffc110 100644 --- a/compactor/src/components/commit/mod.rs +++ b/compactor_scheduler/src/commit/mod.rs @@ -6,10 +6,25 @@ use std::{ use async_trait::async_trait; use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId}; -pub mod catalog; -pub mod logging; -pub mod metrics; -pub mod mock; +pub(crate) mod logging; +pub(crate) mod metrics; +pub(crate) mod mock; + +/// Error returned by [`Commit`] implementations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Commit request was malformed + #[error("Bad commit request: {0}")] + BadRequest(String), + + /// Commit succeeded, but catalog returned an invalid result + #[error("Result from catalog is invalid: {0}")] + InvalidCatalogResult(String), + + /// Commit failed because of an error in the throttler + #[error("Failure in throttler: {0}")] + ThrottlerError(#[from] crate::ThrottleError), +} /// Ensures that the file change (i.e. deletion and creation) are committed to the catalog. #[async_trait] @@ -27,7 +42,7 @@ pub trait Commit: Debug + Display + Send + Sync { upgrade: &[ParquetFile], create: &[ParquetFileParams], target_level: CompactionLevel, - ) -> Vec; + ) -> Result, crate::commit::Error>; } /// Something that can wrap `Commit` instances @@ -50,7 +65,7 @@ where upgrade: &[ParquetFile], create: &[ParquetFileParams], target_level: CompactionLevel, - ) -> Vec { + ) -> Result, crate::commit::Error> { self.as_ref() .commit(partition_id, delete, upgrade, create, target_level) .await diff --git a/compactor_scheduler/src/error.rs b/compactor_scheduler/src/error.rs new file mode 100644 index 0000000000..2847c7858a --- /dev/null +++ b/compactor_scheduler/src/error.rs @@ -0,0 +1,51 @@ +use thiserror::Error; + +/// Scheduler error. +#[derive(Debug, Error)] +pub(crate) enum Error { + #[error("Commit error: {0}")] + Commit(#[from] crate::commit::Error), + + #[error("ThrottleError error: {0}")] + ThrottleError(#[from] crate::ThrottleError), + + #[error("UniquePartitions error: {0}")] + UniqueSource(#[from] crate::UniquePartitionsError), +} + +/// Compactor Error classification. +/// What kind of error did we occur during compaction? +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub enum ErrorKind { + /// Could not access the object store. + ObjectStore, + + /// We ran out of memory (OOM). + OutOfMemory, + + /// Partition took too long. + Timeout, + + /// Unknown/unexpected error. + /// + /// This will likely mark the affected partition as "skipped" and the compactor will no longer touch it. + Unknown(String), +} + +impl ErrorKind { + /// Return static name. + pub fn name(&self) -> &'static str { + match self { + Self::ObjectStore => "object_store", + Self::OutOfMemory => "out_of_memory", + Self::Timeout => "timeout", + Self::Unknown(_) => "unknown", + } + } +} + +impl std::fmt::Display for ErrorKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.name()) + } +} diff --git a/compactor_scheduler/src/lib.rs b/compactor_scheduler/src/lib.rs index 7d7d35e7d4..d482e4085c 100644 --- a/compactor_scheduler/src/lib.rs +++ b/compactor_scheduler/src/lib.rs @@ -26,16 +26,32 @@ use iox_time::TimeProvider; // Workaround for "unused crate" lint false positives. use workspace_hack as _; +pub(crate) mod commit; +pub(crate) use commit::mock::MockCommit; +pub use commit::{Commit, CommitWrapper, Error as CommitError}; + +mod error; +pub use error::ErrorKind; + mod local_scheduler; -pub(crate) use local_scheduler::{id_only_partition_filter::IdOnlyPartitionFilter, LocalScheduler}; -// configurations used externally during scheduler setup +#[allow(unused_imports)] // for testing +pub(crate) use local_scheduler::partition_done_sink::mock::MockPartitionDoneSink; pub use local_scheduler::{ + combos::throttle_partition::Error as ThrottleError, partitions_source_config::PartitionsSourceConfig, shard_config::ShardConfig, LocalSchedulerConfig, }; +pub(crate) use local_scheduler::{ + combos::unique_partitions::Error as UniquePartitionsError, + id_only_partition_filter::IdOnlyPartitionFilter, + partition_done_sink::Error as PartitionDoneSinkError, partition_done_sink::PartitionDoneSink, + LocalScheduler, +}; + // partitions_source trait mod partitions_source; pub use partitions_source::*; + // scheduler trait and associated types mod scheduler; pub use scheduler::*; @@ -49,6 +65,8 @@ pub fn create_scheduler( config: SchedulerConfig, catalog: Arc, time_provider: Arc, + metrics: Arc, + shadow_mode: bool, ) -> Arc { match config { SchedulerConfig::Local(scheduler_config) => { @@ -57,6 +75,8 @@ pub fn create_scheduler( BackoffConfig::default(), catalog, time_provider, + metrics, + shadow_mode, ); Arc::new(scheduler) } @@ -75,13 +95,20 @@ pub fn create_test_scheduler( let scheduler_config = match mocked_partition_ids { None => SchedulerConfig::default(), Some(partition_ids) => SchedulerConfig::Local(LocalSchedulerConfig { + commit_wrapper: None, partitions_source_config: PartitionsSourceConfig::Fixed( partition_ids.into_iter().collect::>(), ), shard_config: None, }), }; - create_scheduler(scheduler_config, catalog, time_provider) + create_scheduler( + scheduler_config, + catalog, + time_provider, + Arc::new(metric::Registry::default()), + false, + ) } #[cfg(test)] diff --git a/compactor_scheduler/src/local_scheduler.rs b/compactor_scheduler/src/local_scheduler.rs index 2aa917ef1f..c17d0c0bf9 100644 --- a/compactor_scheduler/src/local_scheduler.rs +++ b/compactor_scheduler/src/local_scheduler.rs @@ -1,26 +1,37 @@ //! Internals used by [`LocalScheduler`]. +pub(crate) mod catalog_commit; +pub(crate) mod combos; pub(crate) mod id_only_partition_filter; +pub(crate) mod partition_done_sink; pub(crate) mod partitions_source; pub(crate) mod partitions_source_config; pub(crate) mod shard_config; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use backoff::BackoffConfig; use iox_catalog::interface::Catalog; use iox_time::TimeProvider; -use observability_deps::tracing::info; +use observability_deps::tracing::{info, warn}; use crate::{ - CompactionJob, MockPartitionsSource, PartitionsSource, PartitionsSourceConfig, Scheduler, - ShardConfig, + commit::{logging::LoggingCommitWrapper, metrics::MetricsCommitWrapper}, + Commit, CommitUpdate, CommitWrapper, CompactionJob, CompactionJobEnd, CompactionJobEndVariant, + CompactionJobStatus, CompactionJobStatusResponse, CompactionJobStatusVariant, MockCommit, + MockPartitionsSource, PartitionsSource, PartitionsSourceConfig, Scheduler, ShardConfig, + SkipReason, }; use self::{ + catalog_commit::CatalogCommit, + combos::{throttle_partition::throttle_partition, unique_partitions::unique_partitions}, id_only_partition_filter::{ and::AndIdOnlyPartitionFilter, shard::ShardPartitionFilter, IdOnlyPartitionFilter, }, + partition_done_sink::{ + catalog::CatalogPartitionDoneSink, mock::MockPartitionDoneSink, PartitionDoneSink, + }, partitions_source::{ catalog_all::CatalogAllPartitionsSource, catalog_to_compact::CatalogToCompactPartitionsSource, @@ -31,6 +42,10 @@ use self::{ /// Configuration specific to the local scheduler. #[derive(Debug, Default, Clone)] pub struct LocalSchedulerConfig { + /// Optionally wrap the `Commit` instance + /// + /// This is mostly used for testing + pub commit_wrapper: Option>, /// The partitions source config used by the local sceduler. pub partitions_source_config: PartitionsSourceConfig, /// The shard config used by the local sceduler. @@ -40,8 +55,14 @@ pub struct LocalSchedulerConfig { /// Implementation of the scheduler for local (per compactor) scheduling. #[derive(Debug)] pub(crate) struct LocalScheduler { + /// Commits changes (i.e. deletion and creation) to the catalog + pub(crate) commit: Arc, /// The partitions source to use for scheduling. partitions_source: Arc, + /// The actions to take when a partition is done. + /// + /// Includes partition (PartitionId) tracking of uniqueness and throttling. + partition_done_sink: Arc, /// The shard config used for generating the PartitionsSource. shard_config: Option, } @@ -53,7 +74,47 @@ impl LocalScheduler { backoff_config: BackoffConfig, catalog: Arc, time_provider: Arc, + metrics: Arc, + shadow_mode: bool, ) -> Self { + let commit = Self::build_commit( + config.clone(), + backoff_config.clone(), + Arc::clone(&catalog), + metrics, + shadow_mode, + ); + + let partitions_source = Self::build_partitions_source( + config.clone(), + backoff_config.clone(), + Arc::clone(&catalog), + Arc::clone(&time_provider), + ); + + let (partitions_source, commit, partition_done_sink) = Self::build_partition_done_sink( + partitions_source, + commit, + backoff_config, + catalog, + time_provider, + shadow_mode, + ); + + Self { + commit, + partitions_source, + partition_done_sink, + shard_config: config.shard_config, + } + } + + fn build_partitions_source( + config: LocalSchedulerConfig, + backoff_config: BackoffConfig, + catalog: Arc, + time_provider: Arc, + ) -> Arc { let shard_config = config.shard_config; let partitions_source: Arc = match &config.partitions_source_config { PartitionsSourceConfig::CatalogRecentWrites { threshold } => { @@ -86,16 +147,75 @@ impl LocalScheduler { shard_config.shard_id, ))); } - let partitions_source: Arc = - Arc::new(FilterPartitionsSourceWrapper::new( - AndIdOnlyPartitionFilter::new(id_only_partition_filters), - partitions_source, - )); - - Self { + Arc::new(FilterPartitionsSourceWrapper::new( + AndIdOnlyPartitionFilter::new(id_only_partition_filters), partitions_source, - shard_config, - } + )) + } + + fn build_partition_done_sink( + partitions_source: Arc, + commit: Arc, + backoff_config: BackoffConfig, + catalog: Arc, + time_provider: Arc, + shadow_mode: bool, + ) -> ( + Arc, + Arc, + Arc, + ) { + let partition_done_sink: Arc = if shadow_mode { + Arc::new(MockPartitionDoneSink::new()) + } else { + Arc::new(CatalogPartitionDoneSink::new( + backoff_config, + Arc::clone(&catalog), + )) + }; + + let (partitions_source, partition_done_sink) = + unique_partitions(partitions_source, partition_done_sink, 1); + + let (partitions_source, commit, partition_done_sink) = throttle_partition( + partitions_source, + commit, + partition_done_sink, + Arc::clone(&time_provider), + Duration::from_secs(60), + 1, + ); + + ( + Arc::new(partitions_source), + Arc::new(commit), + Arc::new(partition_done_sink), + ) + } + + fn build_commit( + config: LocalSchedulerConfig, + backoff_config: BackoffConfig, + catalog: Arc, + metrics_registry: Arc, + shadow_mode: bool, + ) -> Arc { + let commit: Arc = if shadow_mode { + Arc::new(MockCommit::new()) + } else { + Arc::new(CatalogCommit::new(backoff_config, Arc::clone(&catalog))) + }; + + let commit = if let Some(commit_wrapper) = &config.commit_wrapper { + commit_wrapper.wrap(commit) + } else { + commit + }; + + Arc::new(LoggingCommitWrapper::new(MetricsCommitWrapper::new( + commit, + &metrics_registry, + ))) } } @@ -109,6 +229,57 @@ impl Scheduler for LocalScheduler { .map(CompactionJob::new) .collect() } + + async fn update_job_status( + &self, + job_status: CompactionJobStatus, + ) -> Result> { + match job_status.status { + CompactionJobStatusVariant::Update(commit_update) => { + let CommitUpdate { + partition_id, + delete, + upgrade, + target_level, + create, + } = commit_update; + + let result = self + .commit + .commit(partition_id, &delete, &upgrade, &create, target_level) + .await?; + + Ok(CompactionJobStatusResponse::CreatedParquetFiles(result)) + } + CompactionJobStatusVariant::Error(error_kind) => { + warn!("Error processing job: {:?}: {}", job_status.job, error_kind); + Ok(CompactionJobStatusResponse::Ack) + } + } + } + + async fn end_job( + &self, + end: CompactionJobEnd, + ) -> Result<(), Box> { + match end.end_action { + CompactionJobEndVariant::RequestToSkip(SkipReason(msg)) => { + self.partition_done_sink + .record(end.job.partition_id, Err(msg.into())) + .await?; + + Ok(()) + } + CompactionJobEndVariant::Complete => { + self.partition_done_sink + .record(end.job.partition_id, Ok(())) + .await?; + // TODO: once uuid is handled properly, we can track the job completion + + Ok(()) + } + } + } } impl std::fmt::Display for LocalScheduler { @@ -134,6 +305,8 @@ mod tests { BackoffConfig::default(), TestCatalog::new().catalog(), Arc::new(MockProvider::new(Time::MIN)), + Arc::new(metric::Registry::default()), + false, ); assert_eq!(scheduler.to_string(), "local_compaction_scheduler",); @@ -147,6 +320,7 @@ mod tests { }); let config = LocalSchedulerConfig { + commit_wrapper: None, partitions_source_config: PartitionsSourceConfig::default(), shard_config, }; @@ -156,6 +330,8 @@ mod tests { BackoffConfig::default(), TestCatalog::new().catalog(), Arc::new(MockProvider::new(Time::MIN)), + Arc::new(metric::Registry::default()), + false, ); assert_eq!( diff --git a/compactor/src/components/commit/catalog.rs b/compactor_scheduler/src/local_scheduler/catalog_commit.rs similarity index 51% rename from compactor/src/components/commit/catalog.rs rename to compactor_scheduler/src/local_scheduler/catalog_commit.rs index 97d9eb1415..b6dd1651d9 100644 --- a/compactor/src/components/commit/catalog.rs +++ b/compactor_scheduler/src/local_scheduler/catalog_commit.rs @@ -5,16 +5,16 @@ use backoff::{Backoff, BackoffConfig}; use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId}; use iox_catalog::interface::Catalog; -use super::Commit; +use crate::{commit::Error, Commit}; #[derive(Debug)] -pub struct CatalogCommit { +pub(crate) struct CatalogCommit { backoff_config: BackoffConfig, catalog: Arc, } impl CatalogCommit { - pub fn new(backoff_config: BackoffConfig, catalog: Arc) -> Self { + pub(crate) fn new(backoff_config: BackoffConfig, catalog: Arc) -> Self { Self { backoff_config, catalog, @@ -37,13 +37,27 @@ impl Commit for CatalogCommit { upgrade: &[ParquetFile], create: &[ParquetFileParams], target_level: CompactionLevel, - ) -> Vec { - assert!(!upgrade.is_empty() || (!delete.is_empty() && !create.is_empty())); + ) -> Result, Error> { + let is_upgrade_commit = !upgrade.is_empty(); + let is_replacement_commit = !delete.is_empty() || !create.is_empty(); + let replacement_commit_is_ok = !delete.is_empty() && !create.is_empty(); + + match (is_upgrade_commit, is_replacement_commit) { + (false, false) => { + return Err(Error::BadRequest("commit must have files to upgrade, and/or a set of files to replace (delete and create)".into())); + } + (_, true) if !replacement_commit_is_ok => { + return Err(Error::BadRequest( + "replacement commits must have both files to delete and files to create".into(), + )); + } + _ => {} // is ok + } let delete = delete.iter().map(|f| f.id).collect::>(); let upgrade = upgrade.iter().map(|f| f.id).collect::>(); - Backoff::new(&self.backoff_config) + let result = Backoff::new(&self.backoff_config) .retry_all_errors("commit parquet file changes", || async { let mut repos = self.catalog.repositories().await; let parquet_files = repos.parquet_files(); @@ -54,6 +68,16 @@ impl Commit for CatalogCommit { Ok::<_, iox_catalog::interface::Error>(ids) }) .await - .expect("retry forever") + .expect("retry forever"); + + if result.len() != create.len() { + return Err(Error::InvalidCatalogResult(format!( + "Number of created parquet files is invalid: expected {} but found {}", + create.len(), + result.len() + ))); + } + + return Ok(result); } } diff --git a/compactor/src/components/combos/mod.rs b/compactor_scheduler/src/local_scheduler/combos/mod.rs similarity index 59% rename from compactor/src/components/combos/mod.rs rename to compactor_scheduler/src/local_scheduler/combos/mod.rs index efaf6439ec..b52a36ef14 100644 --- a/compactor/src/components/combos/mod.rs +++ b/compactor_scheduler/src/local_scheduler/combos/mod.rs @@ -1,7 +1,7 @@ //! Combinations of multiple components that together can achieve one goal. -pub mod throttle_partition; -pub mod unique_partitions; +pub(crate) mod throttle_partition; +pub(crate) mod unique_partitions; #[cfg(test)] mod tests; diff --git a/compactor/src/components/combos/tests.rs b/compactor_scheduler/src/local_scheduler/combos/tests.rs similarity index 77% rename from compactor/src/components/combos/tests.rs rename to compactor_scheduler/src/local_scheduler/combos/tests.rs index bf5d3cafb1..aa029da54b 100644 --- a/compactor/src/components/combos/tests.rs +++ b/compactor_scheduler/src/local_scheduler/combos/tests.rs @@ -1,15 +1,15 @@ use std::{sync::Arc, time::Duration}; -use compactor_scheduler::{MockPartitionsSource, PartitionsSource}; use data_types::{CompactionLevel, PartitionId}; use iox_time::{MockProvider, Time}; -use crate::components::{ - combos::{throttle_partition::throttle_partition, unique_partitions::unique_partitions}, - commit::{mock::MockCommit, Commit}, - partition_done_sink::{mock::MockPartitionDoneSink, PartitionDoneSink}, +use crate::{ + Commit, MockCommit, MockPartitionDoneSink, MockPartitionsSource, PartitionDoneSink, + PartitionsSource, }; +use super::{throttle_partition::throttle_partition, unique_partitions::unique_partitions}; + #[tokio::test] async fn test_unique_and_throttle() { let inner_source = Arc::new(MockPartitionsSource::new(vec![ @@ -44,9 +44,14 @@ async fn test_unique_and_throttle() { commit .commit(PartitionId::new(1), &[], &[], &[], CompactionLevel::Initial) - .await; - sink.record(PartitionId::new(1), Ok(())).await; - sink.record(PartitionId::new(2), Ok(())).await; + .await + .expect("commit failed"); + sink.record(PartitionId::new(1), Ok(())) + .await + .expect("record failed"); + sink.record(PartitionId::new(2), Ok(())) + .await + .expect("record failed"); inner_source.set(vec![ PartitionId::new(1), diff --git a/compactor/src/components/combos/throttle_partition.rs b/compactor_scheduler/src/local_scheduler/combos/throttle_partition.rs similarity index 83% rename from compactor/src/components/combos/throttle_partition.rs rename to compactor_scheduler/src/local_scheduler/combos/throttle_partition.rs index d108807a1e..c2af4174a0 100644 --- a/compactor/src/components/combos/throttle_partition.rs +++ b/compactor_scheduler/src/local_scheduler/combos/throttle_partition.rs @@ -1,19 +1,17 @@ //! Throttle partions that receive no commits. -use std::{ - collections::HashMap, - fmt::Display, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{collections::HashMap, fmt::Display, sync::Arc, time::Duration}; use async_trait::async_trait; -use compactor_scheduler::PartitionsSource; use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId}; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use iox_time::{Time, TimeProvider}; +use parking_lot::Mutex; -use crate::components::{commit::Commit, partition_done_sink::PartitionDoneSink}; +use crate::{ + local_scheduler::partition_done_sink::DynError, Commit, CommitError, PartitionDoneSink, + PartitionDoneSinkError, PartitionsSource, +}; /// Ensures that partitions that do not receive any commits are throttled. /// @@ -54,8 +52,8 @@ use crate::components::{commit::Commit, partition_done_sink::PartitionDoneSink}; /// concurrency of this bypass can be controlled via `bypass_concurrency`. /// /// This setup relies on a fact that it does not process duplicate [`PartitionId`]. You may use -/// [`unique_partitions`](crate::components::combos::unique_partitions::unique_partitions) to achieve that. -pub fn throttle_partition( +/// [`unique_partitions`](super::unique_partitions::unique_partitions) to achieve that. +pub(crate) fn throttle_partition( source: T1, commit: T2, sink: T3, @@ -94,6 +92,14 @@ where (source, commit, sink) } +/// Error returned by throttler abstractions. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Failed uniqueness check. + #[error("Unknown or already done partition: {0}")] + Uniqueness(PartitionId), +} + #[derive(Debug, Default)] struct State { // Value is "true" while compaction task is in-flight, and "false" once complete. @@ -107,7 +113,7 @@ struct State { type SharedState = Arc>; #[derive(Debug)] -pub struct ThrottlePartitionsSourceWrapper +pub(crate) struct ThrottlePartitionsSourceWrapper where T1: PartitionsSource, T2: PartitionDoneSink, @@ -139,7 +145,7 @@ where let res = self.inner_source.fetch().await; let (pass, throttle) = { - let mut guard = self.state.lock().expect("not poisoned"); + let mut guard = self.state.lock(); // ensure that in-flight data is non-overlapping for id in &res { @@ -177,10 +183,11 @@ where (pass, throttle) }; - futures::stream::iter(throttle) + // pass through the removal from tracking, since it failed in this fetch() call + let _ = futures::stream::iter(throttle) .map(|id| self.inner_sink.record(id, Ok(()))) .buffer_unordered(self.sink_concurrency) - .collect::<()>() + .try_collect::<()>() .await; pass @@ -188,7 +195,7 @@ where } #[derive(Debug)] -pub struct ThrottleCommitWrapper +pub(crate) struct ThrottleCommitWrapper where T: Commit, { @@ -217,9 +224,9 @@ where upgrade: &[ParquetFile], create: &[ParquetFileParams], target_level: CompactionLevel, - ) -> Vec { + ) -> Result, CommitError> { let known = { - let mut guard = self.state.lock().expect("not poisoned"); + let mut guard = self.state.lock(); match guard.in_flight.get_mut(&partition_id) { Some(val) => { *val = true; @@ -229,10 +236,9 @@ where } }; // perform check when NOT holding the mutex to not poison it - assert!( - known, - "Unknown or already done partition in commit: {partition_id}" - ); + if !known { + return Err(Error::Uniqueness(partition_id).into()); + } self.inner .commit(partition_id, delete, upgrade, create, target_level) @@ -241,7 +247,7 @@ where } #[derive(Debug)] -pub struct ThrottlePartitionDoneSinkWrapper +pub(crate) struct ThrottlePartitionDoneSinkWrapper where T: PartitionDoneSink, { @@ -268,10 +274,10 @@ where async fn record( &self, partition: PartitionId, - res: Result<(), Box>, - ) { + res: Result<(), DynError>, + ) -> Result<(), PartitionDoneSinkError> { let known = { - let mut guard = self.state.lock().expect("not poisoned"); + let mut guard = self.state.lock(); match guard.in_flight.remove(&partition) { Some(val) => { if !val { @@ -285,23 +291,21 @@ where } }; // perform check when NOT holding the mutex to not poison it - assert!( - known, - "Unknown or already done partition in partition done sink: {partition}" - ); + if !known { + return Err(Error::Uniqueness(partition).into()); + } - self.inner.record(partition, res).await; + self.inner.record(partition, res).await } } #[cfg(test)] mod tests { - use compactor_scheduler::MockPartitionsSource; + use assert_matches::assert_matches; use iox_time::MockProvider; - use crate::components::{ - commit::mock::{CommitHistoryEntry, MockCommit}, - partition_done_sink::mock::MockPartitionDoneSink, + use crate::{ + commit::mock::CommitHistoryEntry, MockCommit, MockPartitionDoneSink, MockPartitionsSource, }; use super::*; @@ -357,14 +361,20 @@ mod tests { // commit commit .commit(PartitionId::new(1), &[], &[], &[], CompactionLevel::Initial) - .await; + .await + .expect("commit failed"); commit .commit(PartitionId::new(2), &[], &[], &[], CompactionLevel::Initial) - .await; + .await + .expect("commit failed"); // record - sink.record(PartitionId::new(1), Ok(())).await; - sink.record(PartitionId::new(3), Ok(())).await; + sink.record(PartitionId::new(1), Ok(())) + .await + .expect("record failed"); + sink.record(PartitionId::new(3), Ok(())) + .await + .expect("record failed"); assert_eq!( inner_sink.results(), HashMap::from([(PartitionId::new(1), Ok(())), (PartitionId::new(3), Ok(())),]), @@ -405,9 +415,11 @@ mod tests { // record // can still finish partition 2 and 4 sink.record(PartitionId::new(2), Err(String::from("foo").into())) - .await; + .await + .expect("record failed"); sink.record(PartitionId::new(4), Err(String::from("bar").into())) - .await; + .await + .expect("record failed"); assert_eq!( inner_sink.results(), HashMap::from([ @@ -457,8 +469,7 @@ mod tests { } #[tokio::test] - #[should_panic(expected = "Unknown or already done partition in commit: 1")] - async fn test_panic_commit_unknown() { + async fn test_err_commit_unknown() { let (source, commit, sink) = throttle_partition( MockPartitionsSource::new(vec![PartitionId::new(1)]), MockCommit::new(), @@ -469,14 +480,18 @@ mod tests { ); source.fetch().await; - sink.record(PartitionId::new(1), Ok(())).await; - commit + sink.record(PartitionId::new(1), Ok(())) + .await + .expect("record failed"); + assert_matches!( + commit .commit(PartitionId::new(1), &[], &[], &[], CompactionLevel::Initial) - .await; + .await, + Err(CommitError::ThrottlerError(Error::Uniqueness(res))) if res == PartitionId::new(1) + ); } #[tokio::test] - #[should_panic(expected = "Unknown or already done partition in partition done sink: 1")] async fn test_panic_sink_unknown() { let (source, _commit, sink) = throttle_partition( MockPartitionsSource::new(vec![PartitionId::new(1)]), @@ -488,10 +503,20 @@ mod tests { ); source.fetch().await; - sink.record(PartitionId::new(1), Ok(())).await; - sink.record(PartitionId::new(1), Ok(())).await; + sink.record(PartitionId::new(1), Ok(())) + .await + .expect("record failed"); + + let err = sink.record(PartitionId::new(1), Ok(())).await; + assert_matches!( + err, + Err(PartitionDoneSinkError::Throttler(Error::Uniqueness(partition))) if partition == PartitionId::new(1), + "fails because partition 1 is already done" + ); } + // TODO: remove last legacy panic from scheduler + // Requires change of Scheduler.get_jobs() API to a Result> #[tokio::test] #[should_panic(expected = "Partition already in-flight: 1")] async fn test_panic_duplicate_in_flight() { diff --git a/compactor/src/components/combos/unique_partitions.rs b/compactor_scheduler/src/local_scheduler/combos/unique_partitions.rs similarity index 82% rename from compactor/src/components/combos/unique_partitions.rs rename to compactor_scheduler/src/local_scheduler/combos/unique_partitions.rs index 747fd3b512..c5e1908aa2 100644 --- a/compactor/src/components/combos/unique_partitions.rs +++ b/compactor_scheduler/src/local_scheduler/combos/unique_partitions.rs @@ -1,17 +1,13 @@ //! Ensure that partitions flowing through the pipeline are unique. -use std::{ - collections::HashSet, - fmt::Display, - sync::{Arc, Mutex}, -}; +use std::{collections::HashSet, fmt::Display, sync::Arc}; use async_trait::async_trait; -use compactor_scheduler::PartitionsSource; use data_types::PartitionId; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; +use parking_lot::Mutex; -use crate::components::partition_done_sink::PartitionDoneSink; +use crate::{local_scheduler::partition_done_sink::DynError, PartitionDoneSink, PartitionsSource}; /// Ensures that a unique set of partitions is flowing through the critical section of the compactor pipeline. /// @@ -32,7 +28,7 @@ use crate::components::partition_done_sink::PartitionDoneSink; /// /// | Step | Name | Type | Description | /// | ---- | --------------------- | ----------------------------------------------------------- | ----------- | -/// | 1 | **Actual source** | `inner_source`/`T1`/[`PartitionsSource`], wrapped | This is the actual source, e.g. a [schedule](crate::components::partitions_source::scheduled::ScheduledPartitionsSource) | +/// | 1 | **Actual source** | `inner_source`/`T1`/[`PartitionsSource`], wrapped | This is the actual source, e.g. a [schedule](crate::PartitionsSource) | /// | 2 | **Unique IDs source** | [`UniquePartionsSourceWrapper`], wraps `inner_source`/`T1` | Outputs that [`PartitionId`]s from the `inner_source` but filters out partitions that have not yet reached the uniqueness sink (step 4) | /// | 3 | **Critical section** | -- | Here it is always ensured that a single [`PartitionId`] does NOT occur more than once. | /// | 4 | **Unique IDs sink** | [`UniquePartitionDoneSinkWrapper`], wraps `inner_sink`/`T2` | Observes incoming IDs and removes them from the filter applied in step 2. | @@ -41,7 +37,7 @@ use crate::components::partition_done_sink::PartitionDoneSink; /// Note that partitions filtered out by [`UniquePartionsSourceWrapper`] will directly be forwarded to `inner_sink`. No /// partition is ever lost. This means that `inner_source` and `inner_sink` can perform proper accounting. The /// concurrency of this bypass can be controlled via `bypass_concurrency`. -pub fn unique_partitions( +pub(crate) fn unique_partitions( inner_source: T1, inner_sink: T2, bypass_concurrency: usize, @@ -68,10 +64,18 @@ where (source, sink) } +/// Error returned by uniqueness abstractions. +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + /// Failed uniqueness check. + #[error("Unknown or already done partition: {0}")] + Uniqueness(PartitionId), +} + type InFlight = Arc>>; #[derive(Debug)] -pub struct UniquePartionsSourceWrapper +pub(crate) struct UniquePartionsSourceWrapper where T1: PartitionsSource, T2: PartitionDoneSink, @@ -102,7 +106,7 @@ where let res = self.inner_source.fetch().await; let (unique, duplicates) = { - let mut guard = self.in_flight.lock().expect("not poisoned"); + let mut guard = self.in_flight.lock(); let mut unique = Vec::with_capacity(res.len()); let mut duplicates = Vec::with_capacity(res.len()); @@ -117,10 +121,11 @@ where (unique, duplicates) }; - futures::stream::iter(duplicates) + // pass through the removal from tracking, since it was marked as duplicate in this fetch() call + let _ = futures::stream::iter(duplicates) .map(|id| self.inner_sink.record(id, Ok(()))) .buffer_unordered(self.sink_concurrency) - .collect::<()>() + .try_collect::<()>() .await; unique @@ -128,7 +133,7 @@ where } #[derive(Debug)] -pub struct UniquePartitionDoneSinkWrapper +pub(crate) struct UniquePartitionDoneSinkWrapper where T: PartitionDoneSink, { @@ -153,17 +158,16 @@ where async fn record( &self, partition: PartitionId, - res: Result<(), Box>, - ) { + res: Result<(), DynError>, + ) -> Result<(), crate::PartitionDoneSinkError> { let existing = { - let mut guard = self.in_flight.lock().expect("not poisoned"); + let mut guard = self.in_flight.lock(); guard.remove(&partition) }; // perform check when NOT holding the mutex to not poison it - assert!( - existing, - "Unknown or already done partition in sink: {partition}" - ); + if !existing { + return Err(Error::Uniqueness(partition).into()); + } // perform inner last, because the wrapping order is: // @@ -172,7 +176,7 @@ where // - ... // - unique sink // - wrapped sink - self.inner.record(partition, res).await; + self.inner.record(partition, res).await } } @@ -180,9 +184,9 @@ where mod tests { use std::collections::HashMap; - use compactor_scheduler::MockPartitionsSource; + use assert_matches::assert_matches; - use crate::components::partition_done_sink::mock::MockPartitionDoneSink; + use crate::{MockPartitionDoneSink, MockPartitionsSource, PartitionDoneSinkError}; use super::*; @@ -227,8 +231,12 @@ mod tests { ); // record - sink.record(PartitionId::new(1), Ok(())).await; - sink.record(PartitionId::new(2), Ok(())).await; + sink.record(PartitionId::new(1), Ok(())) + .await + .expect("record failed"); + sink.record(PartitionId::new(2), Ok(())) + .await + .expect("record failed"); assert_eq!( inner_sink.results(), @@ -264,7 +272,8 @@ mod tests { // record sink.record(PartitionId::new(1), Err(String::from("foo").into())) - .await; + .await + .expect("record failed"); assert_eq!( inner_sink.results(), @@ -291,7 +300,6 @@ mod tests { } #[tokio::test] - #[should_panic(expected = "Unknown or already done partition in sink: 1")] async fn test_panic_sink_unknown() { let (source, sink) = unique_partitions( MockPartitionsSource::new(vec![PartitionId::new(1)]), @@ -301,7 +309,11 @@ mod tests { let ids = source.fetch().await; assert_eq!(ids.len(), 1); let id = ids[0]; - sink.record(id, Ok(())).await; - sink.record(id, Ok(())).await; + sink.record(id, Ok(())).await.expect("record failed"); + assert_matches!( + sink.record(id, Ok(())).await, + Err(PartitionDoneSinkError::UniquePartitions(Error::Uniqueness(partition))) if partition == PartitionId::new(1), + "fails because partition 1 is already done" + ); } } diff --git a/compactor/src/components/partition_done_sink/catalog.rs b/compactor_scheduler/src/local_scheduler/partition_done_sink/catalog.rs similarity index 81% rename from compactor/src/components/partition_done_sink/catalog.rs rename to compactor_scheduler/src/local_scheduler/partition_done_sink/catalog.rs index ab384d0ac0..36ba832614 100644 --- a/compactor/src/components/partition_done_sink/catalog.rs +++ b/compactor_scheduler/src/local_scheduler/partition_done_sink/catalog.rs @@ -5,18 +5,16 @@ use backoff::{Backoff, BackoffConfig}; use data_types::PartitionId; use iox_catalog::interface::Catalog; -use crate::error::DynError; - -use super::PartitionDoneSink; +use super::{DynError, Error, PartitionDoneSink}; #[derive(Debug)] -pub struct CatalogPartitionDoneSink { +pub(crate) struct CatalogPartitionDoneSink { backoff_config: BackoffConfig, catalog: Arc, } impl CatalogPartitionDoneSink { - pub fn new(backoff_config: BackoffConfig, catalog: Arc) -> Self { + pub(crate) fn new(backoff_config: BackoffConfig, catalog: Arc) -> Self { Self { backoff_config, catalog, @@ -32,7 +30,7 @@ impl Display for CatalogPartitionDoneSink { #[async_trait] impl PartitionDoneSink for CatalogPartitionDoneSink { - async fn record(&self, partition: PartitionId, res: Result<(), DynError>) { + async fn record(&self, partition: PartitionId, res: Result<(), DynError>) -> Result<(), Error> { if let Err(e) = res { let msg = e.to_string(); @@ -47,7 +45,8 @@ impl PartitionDoneSink for CatalogPartitionDoneSink { .await }) .await - .expect("retry forever"); + .map_err(|e| Error::Catalog(e.to_string()))?; } + Ok(()) } } diff --git a/compactor_scheduler/src/local_scheduler/partition_done_sink/mock.rs b/compactor_scheduler/src/local_scheduler/partition_done_sink/mock.rs new file mode 100644 index 0000000000..1c81974755 --- /dev/null +++ b/compactor_scheduler/src/local_scheduler/partition_done_sink/mock.rs @@ -0,0 +1,81 @@ +use std::{collections::HashMap, fmt::Display}; + +use async_trait::async_trait; +use data_types::PartitionId; +use parking_lot::Mutex; + +use super::{DynError, Error, PartitionDoneSink}; + +/// Mock for [`PartitionDoneSink`]. +#[derive(Debug, Default)] +pub(crate) struct MockPartitionDoneSink { + last: Mutex>>, +} + +impl MockPartitionDoneSink { + /// Create new mock. + pub(crate) fn new() -> Self { + Self::default() + } + + /// Get the last recorded results. + #[allow(dead_code)] // used for testing + pub(crate) fn results(&self) -> HashMap> { + self.last.lock().clone() + } +} + +impl Display for MockPartitionDoneSink { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "mock") + } +} + +#[async_trait] +impl PartitionDoneSink for MockPartitionDoneSink { + async fn record(&self, partition: PartitionId, res: Result<(), DynError>) -> Result<(), Error> { + self.last + .lock() + .insert(partition, res.map_err(|e| e.to_string())); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_display() { + assert_eq!(MockPartitionDoneSink::new().to_string(), "mock",); + } + + #[tokio::test] + async fn test_record() { + let sink = MockPartitionDoneSink::new(); + + assert_eq!(sink.results(), HashMap::default(),); + + sink.record(PartitionId::new(1), Err("msg 1".into())) + .await + .expect("record failed"); + sink.record(PartitionId::new(2), Err("msg 2".into())) + .await + .expect("record failed"); + sink.record(PartitionId::new(1), Err("msg 3".into())) + .await + .expect("record failed"); + sink.record(PartitionId::new(3), Ok(())) + .await + .expect("record failed"); + + assert_eq!( + sink.results(), + HashMap::from([ + (PartitionId::new(1), Err(String::from("msg 3"))), + (PartitionId::new(2), Err(String::from("msg 2"))), + (PartitionId::new(3), Ok(())), + ]), + ); + } +} diff --git a/compactor_scheduler/src/local_scheduler/partition_done_sink/mod.rs b/compactor_scheduler/src/local_scheduler/partition_done_sink/mod.rs new file mode 100644 index 0000000000..5d0f9aec94 --- /dev/null +++ b/compactor_scheduler/src/local_scheduler/partition_done_sink/mod.rs @@ -0,0 +1,48 @@ +pub(crate) mod catalog; +pub(crate) mod mock; + +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; + +use async_trait::async_trait; +use data_types::PartitionId; + +/// Dynamic error type that is provided from the Compactor. +pub(crate) type DynError = Box; + +/// Error returned by [`PartitionDoneSink`] implementations. +#[derive(Debug, thiserror::Error)] +pub(crate) enum Error { + /// PartitionDoneSink failed during connection with catalog + #[error("Failure during catalog communication: {0}")] + Catalog(String), + + /// PartitionDoneSink failed because of an error in the unique partitions source wrapper + #[error("Failure in unique_partitions: {0}")] + UniquePartitions(#[from] crate::UniquePartitionsError), + + /// PartitionDoneSink failed because of an error in the throttler + #[error("Failure in throttler: {0}")] + Throttler(#[from] crate::ThrottleError), +} + +/// Records "partition is done" status for given partition. +#[async_trait] +pub(crate) trait PartitionDoneSink: Debug + Display + Send + Sync { + /// Record "partition is done" status for given partition. + /// + /// This method should retry. + async fn record(&self, partition: PartitionId, res: Result<(), DynError>) -> Result<(), Error>; +} + +#[async_trait] +impl PartitionDoneSink for Arc +where + T: PartitionDoneSink + ?Sized, +{ + async fn record(&self, partition: PartitionId, res: Result<(), DynError>) -> Result<(), Error> { + self.as_ref().record(partition, res).await + } +} diff --git a/compactor_scheduler/src/local_scheduler/partitions_source/catalog_to_compact.rs b/compactor_scheduler/src/local_scheduler/partitions_source/catalog_to_compact.rs index d6c71f424c..acba6f0ec4 100644 --- a/compactor_scheduler/src/local_scheduler/partitions_source/catalog_to_compact.rs +++ b/compactor_scheduler/src/local_scheduler/partitions_source/catalog_to_compact.rs @@ -1,4 +1,4 @@ -use std::{fmt::Display, sync::Arc, sync::Mutex, time::Duration}; +use std::{fmt::Display, sync::Arc, time::Duration}; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; @@ -6,6 +6,7 @@ use data_types::PartitionId; use iox_catalog::interface::Catalog; use iox_time::{Time, TimeProvider}; use observability_deps::tracing::info; +use parking_lot::Mutex; use crate::PartitionsSource; @@ -73,7 +74,7 @@ impl PartitionsSource for CatalogToCompactPartitionsSource { // scope the locking to just maintenance of last_maximum_time, not the query { // we're going check the time range we'd like to query for against the end time of the last query. - let mut last = self.last_maximum_time.lock().unwrap(); + let mut last = self.last_maximum_time.lock(); // query for partitions with activity since the last query. We shouldn't query for a time range // we've already covered. So if the prior query was 2m ago, and the query covered 10m, ending at diff --git a/compactor_scheduler/src/partitions_source.rs b/compactor_scheduler/src/partitions_source.rs index 4f614d5cbb..a4c4e0374c 100644 --- a/compactor_scheduler/src/partitions_source.rs +++ b/compactor_scheduler/src/partitions_source.rs @@ -1,10 +1,11 @@ use std::{ fmt::{Debug, Display}, - sync::{Arc, Mutex}, + sync::Arc, }; use async_trait::async_trait; use data_types::PartitionId; +use parking_lot::Mutex; /// A source of partitions, noted by [`PartitionId`](data_types::PartitionId), that may potentially need compacting. #[async_trait] @@ -48,7 +49,7 @@ mod mock { /// Set PartitionIds for MockPartitionsSource. #[allow(dead_code)] // not used anywhere pub fn set(&self, partitions: Vec) { - *self.partitions.lock().expect("not poisoned") = partitions; + *self.partitions.lock() = partitions; } } @@ -61,7 +62,7 @@ mod mock { #[async_trait] impl PartitionsSource for MockPartitionsSource { async fn fetch(&self) -> Vec { - self.partitions.lock().expect("not poisoned").clone() + self.partitions.lock().clone() } } } diff --git a/compactor_scheduler/src/scheduler.rs b/compactor_scheduler/src/scheduler.rs index 13f5b8355d..5e035c6d99 100644 --- a/compactor_scheduler/src/scheduler.rs +++ b/compactor_scheduler/src/scheduler.rs @@ -1,10 +1,13 @@ -use std::fmt::{Debug, Display}; +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; use async_trait::async_trait; -use data_types::PartitionId; +use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId}; use uuid::Uuid; -use crate::LocalSchedulerConfig; +use crate::{CommitWrapper, ErrorKind, LocalSchedulerConfig, PartitionsSourceConfig}; /// Scheduler configuration. #[derive(Debug, Clone)] @@ -13,6 +16,19 @@ pub enum SchedulerConfig { Local(LocalSchedulerConfig), } +impl SchedulerConfig { + /// Create new [`LocalScheduler`](crate::LocalScheduler) config with a [`CommitWrapper`]. + /// + /// This is useful for testing. + pub fn new_local_with_wrapper(commit_wrapper: Arc) -> Self { + Self::Local(LocalSchedulerConfig { + shard_config: None, + partitions_source_config: PartitionsSourceConfig::default(), + commit_wrapper: Some(commit_wrapper), + }) + } +} + impl Default for SchedulerConfig { fn default() -> Self { Self::Local(LocalSchedulerConfig::default()) @@ -23,18 +39,28 @@ impl std::fmt::Display for SchedulerConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { SchedulerConfig::Local(LocalSchedulerConfig { + commit_wrapper, shard_config, partitions_source_config: _, - }) => match &shard_config { - None => write!(f, "local_compaction_scheduler"), - Some(shard_config) => write!(f, "local_compaction_scheduler({shard_config})",), + }) => match (&shard_config, commit_wrapper) { + (None, None) => write!(f, "local_compaction_scheduler_cfg"), + (Some(shard_config), None) => { + write!(f, "local_compaction_scheduler_cfg({shard_config})",) + } + (Some(shard_config), Some(_)) => write!( + f, + "local_compaction_scheduler_cfg({shard_config},commit_wrapper=Some)", + ), + (None, Some(_)) => { + write!(f, "local_compaction_scheduler_cfg(commit_wrapper=Some)",) + } }, } } } /// Job assignment for a given partition. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct CompactionJob { #[allow(dead_code)] /// Unique identifier for this job. @@ -54,9 +80,144 @@ impl CompactionJob { } } +/// Commit update for a given partition. +#[derive(Debug)] +pub struct CommitUpdate { + /// Partition to be updated. + pub(crate) partition_id: PartitionId, + /// Files to be deleted. + pub(crate) delete: Vec, + /// Files to be upgraded. + pub(crate) upgrade: Vec, + /// Target level for upgraded files. + pub(crate) target_level: CompactionLevel, + /// Files to be created. + pub(crate) create: Vec, +} + +impl CommitUpdate { + /// Create new commit update. + pub fn new( + partition_id: PartitionId, + delete: Vec, + upgrade: Vec, + create: Vec, + target_level: CompactionLevel, + ) -> Self { + Self { + partition_id, + delete, + upgrade, + target_level, + create, + } + } +} + +/// Status. +#[derive(Debug)] +pub enum CompactionJobStatusVariant { + /// Updates associated with ongoing compaction job. + Update(CommitUpdate), + /// Ongoing compaction job error. + /// + /// These errors are not fatal, as some of the compaction job branches may succeed. + Error(ErrorKind), +} + +/// Status ([`CompactionJobStatusVariant`]) associated with a [`CompactionJob`]. +#[derive(Debug)] +pub struct CompactionJobStatus { + /// Job. + pub job: CompactionJob, + /// Status. + pub status: CompactionJobStatusVariant, +} + +/// Response to a [`CompactionJobStatus`]. +#[derive(Debug)] +pub enum CompactionJobStatusResponse { + /// Acknowledge receipt of a [`CompactionJobStatusVariant::Error`] request. + Ack, + /// IDs of the created files that were processed. + /// + /// This is the response to a [`CompactionJobStatusVariant::Update`] request. + CreatedParquetFiles(Vec), +} + +/// Reason for skipping a partition. +#[derive(Debug)] +pub struct SkipReason(pub String); + +/// Options for ending a compaction job. +#[derive(Debug)] +pub enum CompactionJobEndVariant { + /// Request to skip partition. + RequestToSkip(SkipReason), + /// Compaction job is complete. + Complete, +} + +/// End action ([`CompactionJobEndVariant`]) associated with a [`CompactionJob`]. +#[derive(Debug)] +pub struct CompactionJobEnd { + /// Job. + pub job: CompactionJob, + /// End action. + pub end_action: CompactionJobEndVariant, +} + /// Core trait used for all schedulers. #[async_trait] pub trait Scheduler: Send + Sync + Debug + Display { /// Get partitions to be compacted. async fn get_jobs(&self) -> Vec; + + /// Compactors call this function to send an update to the scheduler + /// on the status of a compaction job that compactor was assigned. + /// + /// Compactor sends a [`CompactionJobStatus`]. + /// Scheduler returns a [`CompactionJobStatusResponse`]. + /// + /// Compactor can send multiple [`CompactionJobStatus`] requests for the same job. + async fn update_job_status( + &self, + job_status: CompactionJobStatus, + ) -> Result>; + + /// Compactor sends a [`CompactionJobEnd`], to end a job. + /// + /// This method can only be called once per job. + async fn end_job( + &self, + end_action: CompactionJobEnd, + ) -> Result<(), Box>; +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::Commit; + + use super::*; + + #[test] + fn test_cfg_display_new_local_with_wrapper() { + #[derive(Debug)] + struct MockCommitWrapper; + + impl CommitWrapper for MockCommitWrapper { + fn wrap(&self, commit: Arc) -> Arc { + commit + } + } + + let config = SchedulerConfig::new_local_with_wrapper(Arc::new(MockCommitWrapper)); + + assert_eq!( + config.to_string(), + "local_compaction_scheduler_cfg(commit_wrapper=Some)" + ); + } } diff --git a/compactor_scheduler/tests/helpers.rs b/compactor_scheduler/tests/helpers.rs new file mode 100644 index 0000000000..08874c1757 --- /dev/null +++ b/compactor_scheduler/tests/helpers.rs @@ -0,0 +1,129 @@ +//! Helpers for testing. +//! +//! Usable for any Scheduler API (remote or local). + +use std::sync::Arc; + +use assert_matches::assert_matches; +use compactor_scheduler::{ + CommitUpdate, CompactionJob, CompactionJobEnd, CompactionJobEndVariant, CompactionJobStatus, + CompactionJobStatusResponse, CompactionJobStatusVariant, ErrorKind, Scheduler, SkipReason, +}; +use data_types::{CompactionLevel, ParquetFile, ParquetFileParams}; + +pub async fn can_do_replacement_commit( + scheduler: Arc, + job: CompactionJob, + deleted_parquet_files: Vec, + created_parquet_files: Vec, +) { + let commit_update = CommitUpdate::new( + job.partition_id, + deleted_parquet_files, + vec![], + created_parquet_files, + CompactionLevel::Final, + ); + + let res = scheduler + .update_job_status(CompactionJobStatus { + job, + status: CompactionJobStatusVariant::Update(commit_update), + }) + .await; + + assert_matches!( + res, + Ok(CompactionJobStatusResponse::CreatedParquetFiles(files)) if files.len() == 1, + "expected replacement commit to succeed with exactly one file to be created, got {:?}", res + ); +} + +pub async fn can_do_upgrade_commit( + scheduler: Arc, + job: CompactionJob, + existing_parquet_file: ParquetFile, +) { + let commit_update = CommitUpdate::new( + job.partition_id, + vec![], + vec![existing_parquet_file], + vec![], + CompactionLevel::Final, + ); + + let res = scheduler + .update_job_status(CompactionJobStatus { + job, + status: CompactionJobStatusVariant::Update(commit_update), + }) + .await; + + assert_matches!( + res, + Ok(CompactionJobStatusResponse::CreatedParquetFiles(files)) if files.is_empty(), + "expected upgrade commit to succeed with no files to be created, got {:?}", res + ); +} + +pub async fn can_send_error(scheduler: Arc, job: CompactionJob) { + let res = scheduler + .update_job_status(CompactionJobStatus { + job, + status: CompactionJobStatusVariant::Error(ErrorKind::Unknown( + "error reported (without partition-skip request)".into(), + )), + }) + .await; + + assert_matches!( + res, + Ok(CompactionJobStatusResponse::Ack), + "expected error to be accepted, got {:?}", + res + ); +} + +pub async fn can_do_complete(scheduler: Arc, job: CompactionJob) { + let res = scheduler + .end_job(CompactionJobEnd { + job, + end_action: CompactionJobEndVariant::Complete, + }) + .await; + + assert_matches!( + res, + Ok(()), + "expected job to be marked as complete, got {:?}", + res + ); +} + +pub async fn can_do_skip_request(scheduler: Arc, job: CompactionJob) { + let res = scheduler + .end_job(CompactionJobEnd { + job, + end_action: CompactionJobEndVariant::RequestToSkip(SkipReason( + "error reason given for request the skip".into(), + )), + }) + .await; + + assert_matches!( + res, + Ok(()), + "expected job to be marked as skipped, got {:?}", + res + ); +} + +pub async fn assert_all_partitions_leased(scheduler: Arc) { + let jobs = scheduler.get_jobs().await; + assert_matches!( + jobs[..], + [], + "expected no partition found (all partitions leased), but instead got {:?}", + jobs, + ); +} diff --git a/compactor_scheduler/tests/local_scheduler/end_job.rs b/compactor_scheduler/tests/local_scheduler/end_job.rs new file mode 100644 index 0000000000..f3f7df6fa7 --- /dev/null +++ b/compactor_scheduler/tests/local_scheduler/end_job.rs @@ -0,0 +1,213 @@ +use std::sync::Arc; + +use assert_matches::assert_matches; +use compactor_scheduler::{CompactionJob, CompactionJobEnd, CompactionJobEndVariant, SkipReason}; +use data_types::SkippedCompaction; + +use super::{super::helpers, TestLocalScheduler}; + +#[tokio::test] +async fn test_must_end_job_to_make_partition_reavailable() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let mut jobs = scheduler.get_jobs().await; + let (existing_1, _) = test_scheduler.get_seeded_files(); + + // upgrade commit (to avoid throttling) + helpers::can_do_upgrade_commit(Arc::clone(&scheduler), jobs[0].clone(), existing_1).await; + + // lease is still in place (even after commits) + helpers::assert_all_partitions_leased(Arc::clone(&scheduler)).await; + + // mark job as complete + helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await; + + // TEST: partition is available again + jobs = scheduler.get_jobs().await; + let expected_partition = test_scheduler.get_partition_id(); + assert_matches!( + jobs[..], + [CompactionJob { partition_id, .. }] if partition_id == expected_partition, + "expect partition is available again, instead found {:?}", jobs + ); +} + +#[tokio::test] +async fn test_job_ended_without_commits_should_be_throttled() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let mut jobs = scheduler.get_jobs().await; + + // no commit + + // lease is still in place + helpers::assert_all_partitions_leased(Arc::clone(&scheduler)).await; + + // mark job as complete + helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await; + + // TEST: partition is throttled + jobs = scheduler.get_jobs().await; + assert_matches!( + jobs[..], + [], + "expect partition should be throttled, instead found {:?}", + jobs + ); +} + +#[tokio::test] +async fn test_error_reporting_is_not_sufficient_to_avoid_throttling() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let mut jobs = scheduler.get_jobs().await; + + // no commit + // only error reporting + helpers::can_send_error(Arc::clone(&scheduler), jobs[0].clone()).await; + + // lease is still in place + helpers::assert_all_partitions_leased(Arc::clone(&scheduler)).await; + + // mark job as complete + helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await; + + // TEST: partition is throttled + jobs = scheduler.get_jobs().await; + assert_matches!( + jobs[..], + [], + "expect partition should be throttled, instead found {:?}", + jobs + ); +} + +#[tokio::test] +async fn test_after_complete_job_cannot_end_again() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let jobs = scheduler.get_jobs().await; + + // mark job as complete + helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await; + + // TEST: end_job complete => will fail, since job is already complete + let res = scheduler + .end_job(CompactionJobEnd { + job: jobs[0].clone(), + end_action: CompactionJobEndVariant::Complete, + }) + .await; + assert_matches!( + res, + Err(err) if err.to_string().contains("Unknown or already done partition:"), + "should error if attempt complete after complete, instead found {:?}", res + ); + + // TEST: end_job skip request => will fail, since job is already complete + let res = scheduler + .end_job(CompactionJobEnd { + job: jobs[0].clone(), + end_action: CompactionJobEndVariant::RequestToSkip(SkipReason( + "skip this partition".to_string(), + )), + }) + .await; + assert_matches!( + res, + Err(err) if err.to_string().contains("Unknown or already done partition:"), + "should error if attempt skip partition after complete, instead found {:?}", res + ); +} + +#[tokio::test] +async fn test_after_skip_request_cannot_end_again() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let jobs = scheduler.get_jobs().await; + + // mark partition as skipped (also completes the job) + helpers::can_do_skip_request(Arc::clone(&scheduler), jobs[0].clone()).await; + + // TEST: end_job complete => will fail, since job is already complete + let res = scheduler + .end_job(CompactionJobEnd { + job: jobs[0].clone(), + end_action: CompactionJobEndVariant::Complete, + }) + .await; + assert_matches!( + res, + Err(err) if err.to_string().contains("Unknown or already done partition:"), + "should error if attempt complete after skip request, instead found {:?}", res + ); + + // TEST: end_job skip request => will fail, since job is already complete + let res = scheduler + .end_job(CompactionJobEnd { + job: jobs[0].clone(), + end_action: CompactionJobEndVariant::RequestToSkip(SkipReason( + "skip this partition".to_string(), + )), + }) + .await; + assert_matches!( + res, + Err(err) if err.to_string().contains("Unknown or already done partition:"), + "should error if attempt skip request after skip request, instead found {:?}", res + ); +} + +#[tokio::test] +async fn test_what_skip_request_does() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let expected_partition = test_scheduler.get_partition_id(); + let scheduler = Arc::clone(&test_scheduler.scheduler); + let catalog = test_scheduler.catalog.catalog(); + + let mut jobs = scheduler.get_jobs().await; + let (existing_1, _) = test_scheduler.get_seeded_files(); + + // upgrade commit (to avoid throttling as a confounding variable) + helpers::can_do_upgrade_commit(Arc::clone(&scheduler), jobs[0].clone(), existing_1).await; + + helpers::assert_all_partitions_leased(Arc::clone(&scheduler)).await; + + // mark partition as skipped (also completes the job) + helpers::can_do_skip_request(Arc::clone(&scheduler), jobs[0].clone()).await; + + // TEST: partition still appears in get_jobs() + // todo: plan to update this behavior. Move compactor skip-filtering to scheduler. + jobs = scheduler.get_jobs().await; + assert_matches!( + jobs[..], + [CompactionJob { partition_id, .. }] if partition_id == expected_partition, + "expect partition is still available in get_jobs() (because skip-filtering is compactor-side), instead found {:?}", jobs + ); + + // but partition in catalog is marked as skipped + // will be consumed in compactor `SkippedCompactionsSource` + let catalog_marked_as_skipped = catalog + .repositories() + .await + .partitions() + .get_in_skipped_compaction(expected_partition) + .await; + assert_matches!( + catalog_marked_as_skipped, + Ok(Some(SkippedCompaction { partition_id, .. })) if partition_id == expected_partition, + "expect partition should be marked as skipped in catalog, instead found {:?}", catalog_marked_as_skipped + ); +} diff --git a/compactor_scheduler/tests/local_scheduler/get_jobs.rs b/compactor_scheduler/tests/local_scheduler/get_jobs.rs new file mode 100644 index 0000000000..00e17797de --- /dev/null +++ b/compactor_scheduler/tests/local_scheduler/get_jobs.rs @@ -0,0 +1,104 @@ +use std::sync::Arc; + +use assert_matches::assert_matches; +use compactor_scheduler::{create_test_scheduler, CompactionJob}; +use data_types::PartitionId; +use iox_tests::TestCatalog; + +use super::{super::helpers, TestLocalScheduler}; + +#[tokio::test] +async fn test_mocked_partition_ids() { + let catalog = TestCatalog::new(); + let partitions = vec![PartitionId::new(0), PartitionId::new(1234242)]; + + let scheduler = create_test_scheduler( + catalog.catalog(), + Arc::clone(&catalog.time_provider()), + Some(partitions.clone()), + ); + + let mut result = scheduler + .get_jobs() + .await + .iter() + .map(|j| j.partition_id) + .collect::>(); + result.sort(); + + assert_eq!( + result, + partitions, + "expected the partitions provided to create_test_scheduler() should be returned on `get_jobs()`, instead got `{:?}`", result + ); +} + +#[tokio::test] +async fn test_returns_hot_partition() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + + let jobs = test_scheduler.scheduler.get_jobs().await; + test_scheduler.assert_matches_seeded_hot_partition(&jobs); +} + +#[tokio::test] +async fn test_will_not_fetch_leased_partitions() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + + let jobs = scheduler.get_jobs().await; + test_scheduler.assert_matches_seeded_hot_partition(&jobs); + + helpers::assert_all_partitions_leased(scheduler).await; +} + +#[tokio::test] +async fn test_can_refetch_previously_completed_partitions() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let expected_partition = test_scheduler.get_partition_id(); + let scheduler = Arc::clone(&test_scheduler.scheduler); + let mut jobs = scheduler.get_jobs().await; + let (existing_1, _) = test_scheduler.get_seeded_files(); + + // make commit, so not throttled + helpers::can_do_upgrade_commit(Arc::clone(&scheduler), jobs[0].clone(), existing_1).await; + // complete + helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await; + + // TEST: can refetch + jobs = scheduler.get_jobs().await; + assert_matches!( + jobs[..], + [CompactionJob { partition_id, .. }] if partition_id == expected_partition, + "expected partition is available after lease is complete, but found {:?}", jobs + ); +} + +#[tokio::test] +async fn test_will_throttle_partition_if_no_commits() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let mut jobs = scheduler.get_jobs().await; + + // no commit + + // complete + helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await; + + // TEST: is throttled + jobs = scheduler.get_jobs().await; + assert_matches!( + jobs[..], + [], + "expect partition should be throttled, but found {:?}", + jobs + ); +} diff --git a/compactor_scheduler/tests/local_scheduler/mod.rs b/compactor_scheduler/tests/local_scheduler/mod.rs new file mode 100644 index 0000000000..a2638be1b5 --- /dev/null +++ b/compactor_scheduler/tests/local_scheduler/mod.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; + +use assert_matches::assert_matches; +use compactor_scheduler::{ + create_scheduler, CompactionJob, LocalSchedulerConfig, Scheduler, SchedulerConfig, +}; +use data_types::{ColumnType, ParquetFile, ParquetFileParams, PartitionId}; +use iox_tests::{ParquetFileBuilder, TestCatalog, TestParquetFileBuilder, TestPartition}; + +mod end_job; +mod get_jobs; +mod update_job_status; + +/// Test local_scheduler, with seeded files, used in these integration tests +#[derive(Debug)] +pub struct TestLocalScheduler { + pub scheduler: Arc, + pub catalog: Arc, + test_partition: Arc, + seeded_files: (ParquetFile, ParquetFile), +} + +impl TestLocalScheduler { + pub async fn builder() -> Self { + // create a catalog with a table with one partition + let catalog = TestCatalog::new(); + let ns = catalog.create_namespace_with_retention("ns", None).await; + let table = ns.create_table("table1").await; + table.create_column("time", ColumnType::Time).await; + table.create_column("load", ColumnType::F64).await; + let partition = table.create_partition("k").await; + + // file builder + let file_builder = TestParquetFileBuilder::default().with_line_protocol("table1 load=1 11"); + // seed with two existing parquet files + let seeded_files = ( + partition + .create_parquet_file(file_builder.clone()) + .await + .into(), + partition.create_parquet_file(file_builder).await.into(), + ); + + // local scheduler in default config + // created partitions are "hot" and should be returned with `get_jobs()` + let scheduler = create_scheduler( + SchedulerConfig::Local(LocalSchedulerConfig::default()), + catalog.catalog(), + Arc::clone(&catalog.time_provider()), + Arc::new(metric::Registry::default()), + false, + ); + + Self { + scheduler, + catalog, + test_partition: partition, + seeded_files, + } + } + + pub fn get_seeded_files(&self) -> (ParquetFile, ParquetFile) { + self.seeded_files.clone() + } + + pub async fn create_params_for_new_parquet_file(&self) -> ParquetFileParams { + ParquetFileBuilder::new(42) + .with_partition(self.get_partition_id().get()) + .build() + .into() + } + + pub fn assert_matches_seeded_hot_partition(&self, jobs: &[CompactionJob]) { + assert_matches!( + jobs[..], + [CompactionJob { partition_id, ..}] if partition_id == self.test_partition.partition.id + ); + } + + /// currently has only 1 partition seeded (by default) + pub fn get_partition_id(&self) -> PartitionId { + self.test_partition.partition.id + } +} diff --git a/compactor_scheduler/tests/local_scheduler/update_job_status.rs b/compactor_scheduler/tests/local_scheduler/update_job_status.rs new file mode 100644 index 0000000000..86b9ebd24f --- /dev/null +++ b/compactor_scheduler/tests/local_scheduler/update_job_status.rs @@ -0,0 +1,259 @@ +use std::sync::Arc; + +use assert_matches::assert_matches; +use compactor_scheduler::{CommitUpdate, CompactionJobStatus, CompactionJobStatusVariant}; +use data_types::CompactionLevel; + +use super::{super::helpers, TestLocalScheduler}; + +#[tokio::test] +async fn test_has_two_commit_types() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let jobs = scheduler.get_jobs().await; + let (existing_1, existing_2) = test_scheduler.get_seeded_files(); + + // upgrade commit + helpers::can_do_upgrade_commit(Arc::clone(&scheduler), jobs[0].clone(), existing_1).await; + + // replacement commit + helpers::can_do_replacement_commit( + scheduler, + jobs[0].clone(), + vec![existing_2], // deleted parquet_files + vec![test_scheduler.create_params_for_new_parquet_file().await], // to_create + ) + .await; +} + +#[tokio::test] +async fn test_no_empty_commits_permitted() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let jobs = scheduler.get_jobs().await; + + // empty commit + let res = scheduler + .update_job_status(CompactionJobStatus { + job: jobs[0].clone(), + status: CompactionJobStatusVariant::Update(CommitUpdate::new( + test_scheduler.get_partition_id(), + vec![], + vec![], + vec![], + CompactionLevel::Final, // no args + )), + }) + .await; + + assert_matches!( + res, + Err(err) if err.to_string().contains("commit must have files to upgrade, and/or a set of files to replace (delete and create)"), + "should reject empty commits (no args provided), instead got {:?}", res + ); +} + +#[tokio::test] +async fn test_incomplete_replacement_commits_should_error() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let jobs = scheduler.get_jobs().await; + let (existing_1, existing_2) = test_scheduler.get_seeded_files(); + + // incomplete replacement commit + let res = scheduler + .update_job_status(CompactionJobStatus { + job: jobs[0].clone(), + status: CompactionJobStatusVariant::Update(CommitUpdate::new( + test_scheduler.get_partition_id(), + vec![existing_1.clone()], // to delete + vec![], + vec![], // missing to create + CompactionLevel::Final, + )), + }) + .await; + + assert_matches!( + res, + Err(err) if err.to_string().contains("replacement commits must have both files to delete and files to create"), + "should reject incomplete replacement commit, instead got {:?}", res + ); + + // incomplete replacement commit, + complete upgrade commit + let res = scheduler + .update_job_status(CompactionJobStatus { + job: jobs[0].clone(), + status: CompactionJobStatusVariant::Update(CommitUpdate::new( + test_scheduler.get_partition_id(), + vec![existing_1], // to delete + vec![existing_2], // to upgrade + vec![], // missing to create + CompactionLevel::Final, + )), + }) + .await; + + assert_matches!( + res, + Err(err) if err.to_string().contains("replacement commits must have both files to delete and files to create"), + "should reject incomplete replacement commit (even when an upgrade commit is also provided), but found {:?}", res + ); +} + +#[tokio::test] +async fn test_has_error_reporting() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let jobs = scheduler.get_jobs().await; + + helpers::can_send_error(scheduler, jobs[0].clone()).await; +} + +#[tokio::test] +async fn test_can_commit_after_error_reporting() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let jobs = scheduler.get_jobs().await; + let (existing_1, existing_2) = test_scheduler.get_seeded_files(); + + // error + helpers::can_send_error(Arc::clone(&scheduler), jobs[0].clone()).await; + + // TEST: can do commits + helpers::can_do_upgrade_commit(Arc::clone(&scheduler), jobs[0].clone(), existing_1).await; + helpers::can_do_replacement_commit( + scheduler, + jobs[0].clone(), + vec![existing_2], // deleted + vec![test_scheduler.create_params_for_new_parquet_file().await], // to create + ) + .await; +} + +#[tokio::test] +async fn test_can_report_errors_after_commits() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let jobs = scheduler.get_jobs().await; + let (existing_1, existing_2) = test_scheduler.get_seeded_files(); + + // do commits + helpers::can_do_upgrade_commit(Arc::clone(&scheduler), jobs[0].clone(), existing_1).await; + helpers::can_do_replacement_commit( + Arc::clone(&scheduler), + jobs[0].clone(), + vec![existing_2], // deleted + vec![test_scheduler.create_params_for_new_parquet_file().await], // to create + ) + .await; + + // TEST: can still send error + helpers::can_send_error(scheduler, jobs[0].clone()).await; +} + +#[tokio::test] +async fn test_cannot_commit_after_complete() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let jobs = scheduler.get_jobs().await; + let (existing_1, _) = test_scheduler.get_seeded_files(); + + // mark job as complete + helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await; + + // TEST: cannot do a commit + let res = scheduler + .update_job_status(CompactionJobStatus { + job: jobs[0].clone(), + status: CompactionJobStatusVariant::Update(CommitUpdate::new( + test_scheduler.get_partition_id(), + vec![], + vec![existing_1], + vec![], + CompactionLevel::Final, + )), + }) + .await; + assert_matches!( + res, + Err(err) if err.to_string().contains("Unknown or already done partition:"), + "should reject commit after complete, but found {:?}", res + ); +} + +#[tokio::test] +async fn test_can_do_more_error_reporting_after_complete() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let jobs = scheduler.get_jobs().await; + + // mark job as complete + helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await; + + // TEST: can do more error reporting + helpers::can_send_error(scheduler, jobs[0].clone()).await; +} + +#[tokio::test] +async fn test_cannot_commit_after_skipping() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let jobs = scheduler.get_jobs().await; + let (existing_1, _) = test_scheduler.get_seeded_files(); + + // mark partition as skipped (also completes the job) + helpers::can_do_skip_request(Arc::clone(&scheduler), jobs[0].clone()).await; + + // TEST: cannot do a commit + let res = scheduler + .update_job_status(CompactionJobStatus { + job: jobs[0].clone(), + status: CompactionJobStatusVariant::Update(CommitUpdate::new( + test_scheduler.get_partition_id(), + vec![], + vec![existing_1], + vec![], + CompactionLevel::Final, + )), + }) + .await; + assert_matches!( + res, + Err(err) if err.to_string().contains("Unknown or already done partition:"), + "should reject commit after skipping, but found {:?}", res + ); +} + +#[tokio::test] +async fn test_can_do_more_error_reporting_after_skipping() { + test_helpers::maybe_start_logging(); + + let test_scheduler = TestLocalScheduler::builder().await; + let scheduler = Arc::clone(&test_scheduler.scheduler); + let jobs = scheduler.get_jobs().await; + + // mark partition as skipped (also completes the job) + helpers::can_do_skip_request(Arc::clone(&scheduler), jobs[0].clone()).await; + + // TEST: can do more error reporting + helpers::can_send_error(scheduler, jobs[0].clone()).await; +} diff --git a/compactor_scheduler/tests/mod.rs b/compactor_scheduler/tests/mod.rs new file mode 100644 index 0000000000..d77d6e6df1 --- /dev/null +++ b/compactor_scheduler/tests/mod.rs @@ -0,0 +1,2 @@ +mod helpers; +mod local_scheduler; diff --git a/compactor_test_utils/src/commit_wrapper.rs b/compactor_test_utils/src/commit_wrapper.rs index 388c62fb5c..45aa7f5033 100644 --- a/compactor_test_utils/src/commit_wrapper.rs +++ b/compactor_test_utils/src/commit_wrapper.rs @@ -1,7 +1,7 @@ //! Handles recording commit information to the test run log use async_trait::async_trait; -use compactor::{Commit, CommitWrapper}; +use compactor_scheduler::{Commit, CommitError, CommitWrapper}; use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId}; use std::{ fmt::{Debug, Display}, @@ -34,7 +34,7 @@ impl Commit for CommitRecorder { upgrade: &[ParquetFile], create: &[ParquetFileParams], target_level: CompactionLevel, - ) -> Vec { + ) -> Result, CommitError> { if let Some(invariant_check) = self.invariant_check.as_ref() { invariant_check.check().await }; diff --git a/compactor_test_utils/src/lib.rs b/compactor_test_utils/src/lib.rs index 3929ce61b6..deb87ae9b7 100644 --- a/compactor_test_utils/src/lib.rs +++ b/compactor_test_utils/src/lib.rs @@ -127,7 +127,7 @@ impl TestSetupBuilder { metric_registry: catalog.metric_registry(), trace_collector, catalog: catalog.catalog(), - scheduler_config: SchedulerConfig::default(), + scheduler_config: SchedulerConfig::new_local_with_wrapper(Arc::new(commit_wrapper)), parquet_store_real: catalog.parquet_store.clone(), parquet_store_scratchpad: ParquetStorage::new( Arc::new(object_store::memory::InMemory::new()), @@ -150,7 +150,6 @@ impl TestSetupBuilder { process_once: true, simulate_without_object_store: false, parquet_files_sink_override: None, - commit_wrapper: Some(Arc::new(commit_wrapper)), all_errors_are_fatal: true, max_num_columns_per_table: 200, max_num_files_per_plan: 200, diff --git a/ioxd_compactor/src/lib.rs b/ioxd_compactor/src/lib.rs index 63f376d562..c285f6a487 100644 --- a/ioxd_compactor/src/lib.rs +++ b/ioxd_compactor/src/lib.rs @@ -185,7 +185,6 @@ pub async fn create_compactor_server_type( process_once: compactor_config.process_once, simulate_without_object_store: false, parquet_files_sink_override: None, - commit_wrapper: None, all_errors_are_fatal: false, max_num_columns_per_table: compactor_config.max_num_columns_per_table, max_num_files_per_plan: compactor_config.max_num_files_per_plan, diff --git a/ioxd_compactor/src/scheduler_config.rs b/ioxd_compactor/src/scheduler_config.rs index ca3156d942..3be3eda2e3 100644 --- a/ioxd_compactor/src/scheduler_config.rs +++ b/ioxd_compactor/src/scheduler_config.rs @@ -66,6 +66,7 @@ fn convert_shard_config(config: ShardConfigForLocalScheduler) -> Option SchedulerConfig { match config.compactor_scheduler_type { CompactorSchedulerType::Local => SchedulerConfig::Local(LocalSchedulerConfig { + commit_wrapper: None, partitions_source_config: convert_partitions_source_config( config.partition_source_config, ), diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 5d2b5ae333..217a01eb44 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -67,7 +67,7 @@ prost-types = { version = "0.11" } rand = { version = "0.8", features = ["small_rng"] } rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } -regex-automata = { version = "0.3", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] } +regex-automata = { version = "0.3", default-features = false, features = ["dfa-onepass", "dfa-search", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] } regex-syntax = { version = "0.7" } reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls", "stream"] } ring = { version = "0.16", features = ["std"] } @@ -138,7 +138,7 @@ prost-types = { version = "0.11" } rand = { version = "0.8", features = ["small_rng"] } rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } -regex-automata = { version = "0.3", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] } +regex-automata = { version = "0.3", default-features = false, features = ["dfa-onepass", "dfa-search", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] } regex-syntax = { version = "0.7" } ring = { version = "0.16", features = ["std"] } rustls = { version = "0.21", default-features = false, features = ["dangerous_configuration", "logging", "tls12"] }