Merge branch 'main' into dom/query-pruning-bench

pull/24376/head
Dom 2023-07-25 11:06:39 +01:00 committed by GitHub
commit 0c940222d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 2002 additions and 405 deletions

177
Cargo.lock generated
View File

@ -56,9 +56,9 @@ dependencies = [
[[package]]
name = "allocator-api2"
version = "0.2.15"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56fc6cf8dc8c4158eed8649f9b8b0ea1518eb62b544fe9490d66fa0b349eafe9"
checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5"
[[package]]
name = "android-tzdata"
@ -132,9 +132,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.71"
version = "1.0.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8"
checksum = "3b13c32d80ecc7ab747b80c3784bce54ee8a7a0cc4fbda9bf4cda2cf6fe90854"
[[package]]
name = "arrayref"
@ -450,9 +450,9 @@ dependencies = [
[[package]]
name = "async-compression"
version = "0.4.0"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b0122885821398cc923ece939e24d1056a2384ee719432397fa9db87230ff11"
checksum = "62b74f44609f0f91493e3082d3734d98497e094777144380ea4db9f9905dd5b6"
dependencies = [
"bzip2",
"flate2",
@ -485,7 +485,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.27",
]
[[package]]
@ -496,7 +496,7 @@ checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.27",
]
[[package]]
@ -537,9 +537,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.6.18"
version = "0.6.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8175979259124331c1d7bf6586ee7e0da434155e4b2d48ec2c8386281d8df39"
checksum = "a6a1de45611fdb535bfde7b7de4fd54f4fd2b17b1737c0a59b69bf9b92074b8c"
dependencies = [
"async-trait",
"axum-core",
@ -650,9 +650,9 @@ dependencies = [
[[package]]
name = "blake3"
version = "1.4.0"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "729b71f35bd3fa1a4c86b85d32c8b9069ea7fe14f7a53cfabb65f62d4265b888"
checksum = "199c42ab6972d92c9f8995f086273d25c42fc0f7b2a1fcefba465c1352d25ba5"
dependencies = [
"arrayref",
"arrayvec",
@ -694,13 +694,12 @@ dependencies = [
[[package]]
name = "bstr"
version = "1.5.0"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a246e68bb43f6cd9db24bea052a53e40405417c5fb372e3d1a8a7f770a564ef5"
checksum = "6798148dccfbff0fae41c7574d2fa8f1ef3492fba0face179de5d8d447d67b05"
dependencies = [
"memchr",
"once_cell",
"regex-automata 0.1.10",
"regex-automata 0.3.3",
"serde",
]
@ -907,7 +906,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.27",
]
[[package]]
@ -998,14 +997,21 @@ dependencies = [
name = "compactor_scheduler"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"backoff",
"data_types",
"futures",
"iox_catalog",
"iox_tests",
"iox_time",
"itertools 0.11.0",
"metric",
"observability_deps",
"parking_lot",
"sharder",
"test_helpers",
"thiserror",
"tokio",
"uuid",
"workspace-hack",
@ -1127,9 +1133,9 @@ dependencies = [
[[package]]
name = "constant_time_eq"
version = "0.2.6"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21a53c0a4d288377e7415b53dcfc3c04da5cdc2cc95c8d5ac178b58f0b861ad6"
checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2"
[[package]]
name = "core-foundation-sys"
@ -1148,9 +1154,9 @@ dependencies = [
[[package]]
name = "cpufeatures"
version = "0.2.8"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03e69e28e9f7f77debdedbaafa2866e1de9ba56df55a8bd7cfc724c25a09987c"
checksum = "a17b76ff3a4162b0b27f354a0c87015ddad39d35f9c0c36607a3bdd175dde1f1"
dependencies = [
"libc",
]
@ -1327,12 +1333,12 @@ dependencies = [
[[package]]
name = "dashmap"
version = "5.4.0"
version = "5.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
checksum = "6943ae99c34386c84a470c499d3414f66502a41340aa895406e0d2e4a207b91d"
dependencies = [
"cfg-if",
"hashbrown 0.12.3",
"hashbrown 0.14.0",
"lock_api",
"once_cell",
"parking_lot_core",
@ -1454,7 +1460,7 @@ dependencies = [
"lazy_static",
"sqlparser 0.35.0",
"strum 0.25.0",
"strum_macros 0.25.0",
"strum_macros 0.25.1",
]
[[package]]
@ -1660,9 +1666,9 @@ dependencies = [
[[package]]
name = "equivalent"
version = "1.0.0"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88bffebc5d80432c9b140ee17875ff173a8ab62faad5b257da912bd2f6c1c0a1"
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
@ -1917,7 +1923,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.27",
]
[[package]]
@ -2271,9 +2277,9 @@ dependencies = [
[[package]]
name = "http-range-header"
version = "0.3.0"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f"
[[package]]
name = "httparse"
@ -2319,10 +2325,11 @@ dependencies = [
[[package]]
name = "hyper-rustls"
version = "0.24.0"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0646026eb1b3eea4cd9ba47912ea5ce9cc07713d105b1a14698f4e6433d348b7"
checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97"
dependencies = [
"futures-util",
"http",
"hyper",
"rustls",
@ -3166,9 +3173,9 @@ dependencies = [
[[package]]
name = "itoa"
version = "1.0.7"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0aa48fab2893d8a49caa94082ae8488f4e1050d73b367881dcd2198f4199fd8"
checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
[[package]]
name = "jobserver"
@ -3619,9 +3626,9 @@ dependencies = [
[[package]]
name = "num"
version = "0.4.0"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43db66d1170d347f9a065114077f7dccb00c1b9478c89384490a3425279a4606"
checksum = "b05180d69e3da0e530ba2a1dae5110317e49e3b7f3d41be227dc5f92e49ee7af"
dependencies = [
"num-bigint",
"num-complex",
@ -4026,9 +4033,9 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "pest"
version = "2.7.0"
version = "2.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f73935e4d55e2abf7f130186537b19e7a4abc886a0252380b59248af473a3fc9"
checksum = "0d2d1d55045829d65aad9d389139882ad623b33b904e7c9f1b10c5b8927298e5"
dependencies = [
"thiserror",
"ucd-trie",
@ -4036,9 +4043,9 @@ dependencies = [
[[package]]
name = "pest_derive"
version = "2.7.0"
version = "2.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aef623c9bbfa0eedf5a0efba11a5ee83209c326653ca31ff019bec3a95bfff2b"
checksum = "5f94bca7e7a599d89dea5dfa309e217e7906c3c007fb9c3299c40b10d6a315d3"
dependencies = [
"pest",
"pest_generator",
@ -4046,22 +4053,22 @@ dependencies = [
[[package]]
name = "pest_generator"
version = "2.7.0"
version = "2.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3e8cba4ec22bada7fc55ffe51e2deb6a0e0db2d0b7ab0b103acc80d2510c190"
checksum = "99d490fe7e8556575ff6911e45567ab95e71617f43781e5c05490dc8d75c965c"
dependencies = [
"pest",
"pest_meta",
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.27",
]
[[package]]
name = "pest_meta"
version = "2.7.0"
version = "2.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a01f71cb40bd8bb94232df14b946909e14660e33fc05db3e50ae2a82d7ea0ca0"
checksum = "2674c66ebb4b4d9036012091b537aae5878970d6999f81a265034d85b136b341"
dependencies = [
"once_cell",
"pest",
@ -4133,7 +4140,7 @@ checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.27",
]
[[package]]
@ -4285,9 +4292,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068"
[[package]]
name = "proc-macro2"
version = "1.0.63"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b368fba921b0dce7e60f5e04ec15e565b3303972b42bcfde1d0713b881959eb"
checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
dependencies = [
"unicode-ident",
]
@ -4473,9 +4480,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.29"
version = "1.0.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "573015e8ab27661678357f27dc26460738fd2b6c86e46f386fde94cb5d913105"
checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965"
dependencies = [
"proc-macro2",
]
@ -4816,9 +4823,9 @@ dependencies = [
[[package]]
name = "rustversion"
version = "1.0.12"
version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06"
checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4"
[[package]]
name = "rustyline"
@ -4843,9 +4850,9 @@ dependencies = [
[[package]]
name = "ryu"
version = "1.0.13"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041"
checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741"
[[package]]
name = "same-file"
@ -4886,15 +4893,15 @@ dependencies = [
[[package]]
name = "semver"
version = "1.0.17"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed"
checksum = "b0293b4b29daaf487284529cc2f5675b8e57c61f70167ba415a463651fd6a918"
[[package]]
name = "seq-macro"
version = "0.3.3"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6b44e8fc93a14e66336d230954dda83d18b4605ccace8fe09bc7514a71ad0bc"
checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4"
[[package]]
name = "serde"
@ -4913,7 +4920,7 @@ checksum = "b23f7ade6f110613c0d63858ddb8b94c1041f550eab58a16b371bdf2c9c80ab4"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.27",
]
[[package]]
@ -5563,9 +5570,9 @@ checksum = "9091b6114800a5f2141aee1d1b9d6ca3592ac062dc5decb3764ec5895a47b4eb"
[[package]]
name = "stringprep"
version = "0.1.2"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1"
checksum = "db3737bde7edce97102e0e2b15365bf7a20bfdb5f60f4f9e8d7004258a51a8da"
dependencies = [
"unicode-bidi",
"unicode-normalization",
@ -5589,7 +5596,7 @@ version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125"
dependencies = [
"strum_macros 0.25.0",
"strum_macros 0.25.1",
]
[[package]]
@ -5607,15 +5614,15 @@ dependencies = [
[[package]]
name = "strum_macros"
version = "0.25.0"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe9f3bd7d2e45dcc5e265fbb88d6513e4747d8ef9444cf01a533119bce28a157"
checksum = "6069ca09d878a33f883cc06aaa9718ede171841d3832450354410b718b097232"
dependencies = [
"heck",
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.26",
"syn 2.0.27",
]
[[package]]
@ -5626,9 +5633,9 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]]
name = "symbolic-common"
version = "12.2.0"
version = "12.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38f7afd8bcd36190409e6b71d89928f7f09d918a7aa3460d847bc49a538d672e"
checksum = "167a4ffd7c35c143fd1030aa3c2caf76ba42220bd5a6b5f4781896434723b8c3"
dependencies = [
"debugid",
"memmap2",
@ -5638,9 +5645,9 @@ dependencies = [
[[package]]
name = "symbolic-demangle"
version = "12.2.0"
version = "12.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec64922563a36e3fe686b6d99f06f25dacad2a202ac7502ed642930a188fb20a"
checksum = "e378c50e80686c1c5c205674e1f86a2858bec3d2a7dfdd690331a8a19330f293"
dependencies = [
"cpp_demangle",
"rustc-demangle",
@ -5660,9 +5667,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.26"
version = "2.0.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45c3457aacde3c65315de5031ec191ce46604304d2446e803d71ade03308d970"
checksum = "b60f673f44a8255b9c8c657daf66a596d435f2da81a555b06dc644d080ba45e0"
dependencies = [
"proc-macro2",
"quote",
@ -5785,7 +5792,7 @@ checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.27",
]
[[package]]
@ -5914,7 +5921,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.27",
]
[[package]]
@ -5986,9 +5993,9 @@ dependencies = [
[[package]]
name = "toml_edit"
version = "0.19.12"
version = "0.19.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c500344a19072298cd05a7224b3c0c629348b78692bf48466c5238656e315a78"
checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
dependencies = [
"indexmap 2.0.0",
"serde",
@ -6187,7 +6194,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.27",
]
[[package]]
@ -6302,9 +6309,9 @@ checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
[[package]]
name = "ucd-trie"
version = "0.1.5"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e79c4d996edb816c91e4308506774452e55e95c3c9de07b6729e17e15a5ef81"
checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9"
[[package]]
name = "unarray"
@ -6320,9 +6327,9 @@ checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460"
[[package]]
name = "unicode-ident"
version = "1.0.9"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0"
checksum = "301abaae475aa91687eb82514b328ab47a211a533026cb25fc3e519b86adfc3c"
[[package]]
name = "unicode-normalization"
@ -6500,7 +6507,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.27",
"wasm-bindgen-shared",
]
@ -6534,7 +6541,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.26",
"syn 2.0.27",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -6796,9 +6803,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
[[package]]
name = "winnow"
version = "0.4.7"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca0ace3845f0d96209f0375e6d367e3eb87eb65d27d445bdc9f1843a26f39448"
checksum = "25b5872fa2e10bd067ae946f927e726d7d603eaeb6e02fa6a350e0722d2b8c11"
dependencies = [
"memchr",
]
@ -6890,7 +6897,7 @@ dependencies = [
"sqlx-postgres",
"sqlx-sqlite",
"syn 1.0.109",
"syn 2.0.26",
"syn 2.0.27",
"thrift",
"tokio",
"tokio-stream",

View File

@ -0,0 +1,51 @@
use std::sync::Arc;
use compactor_scheduler::{
CommitUpdate, CompactionJob, CompactionJobStatus, CompactionJobStatusResponse,
CompactionJobStatusVariant, Scheduler,
};
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
#[derive(Debug)]
pub struct CommitToScheduler {
scheduler: Arc<dyn Scheduler>,
}
impl CommitToScheduler {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
Self { scheduler }
}
pub async fn commit(
&self,
partition_id: PartitionId,
delete: &[ParquetFile],
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Result<Vec<ParquetFileId>, crate::DynError> {
match self
.scheduler
.update_job_status(CompactionJobStatus {
job: CompactionJob::new(partition_id),
status: CompactionJobStatusVariant::Update(CommitUpdate::new(
partition_id,
delete.into(),
upgrade.into(),
create.into(),
target_level,
)),
})
.await?
{
CompactionJobStatusResponse::CreatedParquetFiles(ids) => Ok(ids),
CompactionJobStatusResponse::Ack => unreachable!("scheduler should not ack"),
}
}
}
impl std::fmt::Display for CommitToScheduler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "CommitToScheduler")
}
}

View File

@ -12,11 +12,7 @@ use crate::{config::Config, error::ErrorKind, object_store::ignore_writes::Ignor
use super::{
changed_files_filter::logging::LoggingChangedFiles,
combos::{throttle_partition::throttle_partition, unique_partitions::unique_partitions},
commit::{
catalog::CatalogCommit, logging::LoggingCommitWrapper, metrics::MetricsCommitWrapper,
mock::MockCommit, Commit,
},
commit::CommitToScheduler,
df_plan_exec::{
dedicated::DedicatedDataFusionPlanExec, noop::NoopDataFusionPlanExec, DataFusionPlanExec,
},
@ -39,9 +35,9 @@ use super::{
},
parquet_files_sink::{dispatch::DispatchParquetFilesSink, ParquetFilesSink},
partition_done_sink::{
catalog::CatalogPartitionDoneSink, error_kind::ErrorKindPartitionDoneSinkWrapper,
logging::LoggingPartitionDoneSinkWrapper, metrics::MetricsPartitionDoneSinkWrapper,
mock::MockPartitionDoneSink, PartitionDoneSink,
error_kind::ErrorKindPartitionDoneSinkWrapper, logging::LoggingPartitionDoneSinkWrapper,
metrics::MetricsPartitionDoneSinkWrapper, outcome::PartitionDoneSinkToScheduler,
PartitionDoneSink,
},
partition_files_source::{
catalog::{CatalogPartitionFilesSource, QueryRateLimiter},
@ -93,6 +89,8 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
config.scheduler_config.clone(),
Arc::clone(&config.catalog),
Arc::clone(&config.time_provider),
Arc::clone(&config.metric_registry),
config.shadow_mode,
);
let (partitions_source, commit, partition_done_sink) =
make_partitions_source_commit_partition_sink(config, Arc::clone(&scheduler));
@ -123,52 +121,17 @@ fn make_partitions_source_commit_partition_sink(
scheduler: Arc<dyn Scheduler>,
) -> (
Arc<dyn PartitionsSource>,
Arc<dyn Commit>,
Arc<CommitToScheduler>,
Arc<dyn PartitionDoneSink>,
) {
let partitions_source = ScheduledPartitionsSource::new(scheduler);
let partitions_source = ScheduledPartitionsSource::new(Arc::clone(&scheduler));
let partition_done_sink: Arc<dyn PartitionDoneSink> = if config.shadow_mode {
Arc::new(MockPartitionDoneSink::new())
} else {
Arc::new(CatalogPartitionDoneSink::new(
config.backoff_config.clone(),
Arc::clone(&config.catalog),
))
};
let commit = CommitToScheduler::new(Arc::clone(&scheduler));
let commit: Arc<dyn Commit> = if config.shadow_mode {
Arc::new(MockCommit::new())
} else {
Arc::new(CatalogCommit::new(
config.backoff_config.clone(),
Arc::clone(&config.catalog),
))
};
let commit = if let Some(commit_wrapper) = config.commit_wrapper.as_ref() {
commit_wrapper.wrap(commit)
} else {
commit
};
let (partitions_source, partition_done_sink) =
unique_partitions(partitions_source, partition_done_sink, 1);
let (partitions_source, commit, partition_done_sink) = throttle_partition(
partitions_source,
commit,
partition_done_sink,
Arc::clone(&config.time_provider),
Duration::from_secs(60),
1,
);
let commit = Arc::new(LoggingCommitWrapper::new(MetricsCommitWrapper::new(
commit,
&config.metric_registry,
)));
let partition_done_sink = PartitionDoneSinkToScheduler::new(Arc::clone(&scheduler));
// compactors are responsible for error classification
// and any future decisions regarding graceful shutdown
let partition_done_sink: Arc<dyn PartitionDoneSink> = if config.all_errors_are_fatal {
Arc::new(partition_done_sink)
} else {
@ -185,6 +148,7 @@ fn make_partitions_source_commit_partition_sink(
})
.copied()
.collect(),
scheduler,
))
};
let partition_done_sink = Arc::new(LoggingPartitionDoneSinkWrapper::new(
@ -210,7 +174,7 @@ fn make_partitions_source_commit_partition_sink(
))
};
(partitions_source, commit, partition_done_sink)
(partitions_source, Arc::new(commit), partition_done_sink)
}
fn make_partition_stream(

View File

@ -1,9 +1,9 @@
use std::sync::Arc;
use self::{
changed_files_filter::ChangedFilesFilter, commit::Commit, df_plan_exec::DataFusionPlanExec,
df_planner::DataFusionPlanner, divide_initial::DivideInitial, file_classifier::FileClassifier,
ir_planner::IRPlanner, parquet_files_sink::ParquetFilesSink,
changed_files_filter::ChangedFilesFilter, commit::CommitToScheduler,
df_plan_exec::DataFusionPlanExec, df_planner::DataFusionPlanner, divide_initial::DivideInitial,
file_classifier::FileClassifier, ir_planner::IRPlanner, parquet_files_sink::ParquetFilesSink,
partition_done_sink::PartitionDoneSink, partition_files_source::PartitionFilesSource,
partition_filter::PartitionFilter, partition_info_source::PartitionInfoSource,
partition_stream::PartitionStream,
@ -12,8 +12,7 @@ use self::{
};
pub mod changed_files_filter;
pub mod combos;
pub mod commit;
pub(crate) mod commit;
pub mod df_plan_exec;
pub mod df_planner;
pub mod divide_initial;
@ -61,8 +60,8 @@ pub struct Components {
pub post_classification_partition_filter: Arc<dyn PostClassificationPartitionFilter>,
/// Records "partition is done" status for given partition.
pub partition_done_sink: Arc<dyn PartitionDoneSink>,
/// Commits changes (i.e. deletion and creation) to the catalog.
pub commit: Arc<dyn Commit>,
/// Commits changes (i.e. deletion and creation).
pub commit: Arc<CommitToScheduler>,
/// Creates `PlanIR` that describes what files should be compacted and updated
pub ir_planner: Arc<dyn IRPlanner>,
/// Creates an Execution plan for a `PlanIR`

View File

@ -1,6 +1,10 @@
use std::{collections::HashSet, fmt::Display};
use std::{collections::HashSet, fmt::Display, sync::Arc};
use async_trait::async_trait;
use compactor_scheduler::{
CompactionJob, CompactionJobStatus, CompactionJobStatusResponse, CompactionJobStatusVariant,
ErrorKind as SchedulerErrorKind, Scheduler,
};
use data_types::PartitionId;
use crate::error::{DynError, ErrorKind, ErrorKindExt};
@ -14,14 +18,19 @@ where
{
kind: HashSet<ErrorKind>,
inner: T,
scheduler: Arc<dyn Scheduler>,
}
impl<T> ErrorKindPartitionDoneSinkWrapper<T>
where
T: PartitionDoneSink,
{
pub fn new(inner: T, kind: HashSet<ErrorKind>) -> Self {
Self { kind, inner }
pub fn new(inner: T, kind: HashSet<ErrorKind>, scheduler: Arc<dyn Scheduler>) -> Self {
Self {
kind,
inner,
scheduler,
}
}
}
@ -41,13 +50,42 @@ impl<T> PartitionDoneSink for ErrorKindPartitionDoneSinkWrapper<T>
where
T: PartitionDoneSink,
{
async fn record(&self, partition: PartitionId, res: Result<(), DynError>) {
async fn record(
&self,
partition: PartitionId,
res: Result<(), DynError>,
) -> Result<(), DynError> {
match res {
Ok(()) => self.inner.record(partition, Ok(())).await,
Err(e) if self.kind.contains(&e.classify()) => {
self.inner.record(partition, Err(e)).await;
let scheduler_error = match SchedulerErrorKind::from(e.classify()) {
SchedulerErrorKind::OutOfMemory => SchedulerErrorKind::OutOfMemory,
SchedulerErrorKind::ObjectStore => SchedulerErrorKind::ObjectStore,
SchedulerErrorKind::Timeout => SchedulerErrorKind::Timeout,
SchedulerErrorKind::Unknown(_) => SchedulerErrorKind::Unknown(e.to_string()),
};
match self
.scheduler
.update_job_status(CompactionJobStatus {
job: CompactionJob::new(partition),
status: CompactionJobStatusVariant::Error(scheduler_error),
})
.await?
{
CompactionJobStatusResponse::Ack => {}
CompactionJobStatusResponse::CreatedParquetFiles(_) => {
unreachable!("scheduler should not created parquet files")
}
}
self.inner.record(partition, Err(e)).await
}
Err(e) => {
// contract of this abstraction,
// where we do not pass to `self.inner` if not in `self.kind`
Err(e)
}
_ => {}
}
}
}
@ -56,18 +94,24 @@ where
mod tests {
use std::{collections::HashMap, sync::Arc};
use crate::components::partition_done_sink::mock::MockPartitionDoneSink;
use compactor_scheduler::create_test_scheduler;
use datafusion::error::DataFusionError;
use iox_tests::TestCatalog;
use iox_time::{MockProvider, Time};
use object_store::Error as ObjectStoreError;
use super::*;
use super::{super::mock::MockPartitionDoneSink, *};
#[test]
fn test_display() {
let sink = ErrorKindPartitionDoneSinkWrapper::new(
MockPartitionDoneSink::new(),
HashSet::from([ErrorKind::ObjectStore, ErrorKind::OutOfMemory]),
create_test_scheduler(
TestCatalog::new().catalog(),
Arc::new(MockProvider::new(Time::MIN)),
None,
),
);
assert_eq!(sink.to_string(), "kind([ObjectStore, OutOfMemory], mock)");
}
@ -78,22 +122,33 @@ mod tests {
let sink = ErrorKindPartitionDoneSinkWrapper::new(
Arc::clone(&inner),
HashSet::from([ErrorKind::ObjectStore, ErrorKind::OutOfMemory]),
create_test_scheduler(
TestCatalog::new().catalog(),
Arc::new(MockProvider::new(Time::MIN)),
None,
),
);
sink.record(
PartitionId::new(1),
Err(Box::new(ObjectStoreError::NotImplemented)),
)
.await;
.await
.expect("record failed");
sink.record(
PartitionId::new(2),
Err(Box::new(DataFusionError::ResourcesExhausted(String::from(
"foo",
)))),
)
.await;
sink.record(PartitionId::new(3), Err("foo".into())).await;
sink.record(PartitionId::new(4), Ok(())).await;
.await
.expect("record failed");
sink.record(PartitionId::new(3), Err("foo".into()))
.await
.unwrap_err();
sink.record(PartitionId::new(4), Ok(()))
.await
.expect("record failed");
assert_eq!(
inner.results(),

View File

@ -39,12 +39,17 @@ impl<T> PartitionDoneSink for LoggingPartitionDoneSinkWrapper<T>
where
T: PartitionDoneSink,
{
async fn record(&self, partition: PartitionId, res: Result<(), DynError>) {
async fn record(
&self,
partition: PartitionId,
res: Result<(), DynError>,
) -> Result<(), DynError> {
match &res {
Ok(()) => {
info!(partition_id = partition.get(), "Finished partition",);
}
Err(e) => {
// log compactor errors, classified by compactor ErrorKind
error!(
%e,
kind=e.classify().name(),
@ -53,7 +58,7 @@ where
);
}
}
self.inner.record(partition, res).await;
self.inner.record(partition, res).await
}
}
@ -64,9 +69,7 @@ mod tests {
use object_store::Error as ObjectStoreError;
use test_helpers::tracing::TracingCapture;
use crate::components::partition_done_sink::mock::MockPartitionDoneSink;
use super::*;
use super::{super::mock::MockPartitionDoneSink, *};
#[test]
fn test_display() {
@ -81,14 +84,21 @@ mod tests {
let capture = TracingCapture::new();
sink.record(PartitionId::new(1), Err("msg 1".into())).await;
sink.record(PartitionId::new(2), Err("msg 2".into())).await;
sink.record(PartitionId::new(1), Err("msg 1".into()))
.await
.expect("record failed");
sink.record(PartitionId::new(2), Err("msg 2".into()))
.await
.expect("record failed");
sink.record(
PartitionId::new(1),
Err(Box::new(ObjectStoreError::NotImplemented)),
)
.await;
sink.record(PartitionId::new(3), Ok(())).await;
.await
.expect("record failed");
sink.record(PartitionId::new(3), Ok(()))
.await
.expect("record failed");
assert_eq!(
capture.to_string(),

View File

@ -62,12 +62,17 @@ impl<T> PartitionDoneSink for MetricsPartitionDoneSinkWrapper<T>
where
T: PartitionDoneSink,
{
async fn record(&self, partition: PartitionId, res: Result<(), DynError>) {
async fn record(
&self,
partition: PartitionId,
res: Result<(), DynError>,
) -> Result<(), DynError> {
match &res {
Ok(()) => {
self.ok_counter.inc(1);
}
Err(e) => {
// classify and track counts of compactor ErrorKind
let kind = e.classify();
self.error_counter
.get(&kind)
@ -75,7 +80,7 @@ where
.inc(1);
}
}
self.inner.record(partition, res).await;
self.inner.record(partition, res).await
}
}
@ -86,9 +91,7 @@ mod tests {
use metric::{assert_counter, Attributes};
use object_store::Error as ObjectStoreError;
use crate::components::partition_done_sink::mock::MockPartitionDoneSink;
use super::*;
use super::{super::mock::MockPartitionDoneSink, *};
#[test]
fn test_display() {
@ -107,14 +110,21 @@ mod tests {
assert_error_counter(&registry, "unknown", 0);
assert_error_counter(&registry, "object_store", 0);
sink.record(PartitionId::new(1), Err("msg 1".into())).await;
sink.record(PartitionId::new(2), Err("msg 2".into())).await;
sink.record(PartitionId::new(1), Err("msg 1".into()))
.await
.expect("record failed");
sink.record(PartitionId::new(2), Err("msg 2".into()))
.await
.expect("record failed");
sink.record(
PartitionId::new(1),
Err(Box::new(ObjectStoreError::NotImplemented)),
)
.await;
sink.record(PartitionId::new(3), Ok(())).await;
.await
.expect("record failed");
sink.record(PartitionId::new(3), Ok(()))
.await
.expect("record failed");
assert_ok_counter(&registry, 1);
assert_error_counter(&registry, "unknown", 2);

View File

@ -3,21 +3,23 @@ use std::{collections::HashMap, fmt::Display, sync::Mutex};
use async_trait::async_trait;
use data_types::PartitionId;
use crate::error::DynError;
use super::PartitionDoneSink;
use super::{DynError, PartitionDoneSink};
/// Mock for [`PartitionDoneSink`].
#[derive(Debug, Default)]
pub struct MockPartitionDoneSink {
last: Mutex<HashMap<PartitionId, Result<(), String>>>,
}
impl MockPartitionDoneSink {
/// Create new mock.
#[allow(dead_code)] // used for testing
pub fn new() -> Self {
Self::default()
}
#[allow(dead_code)] // not used anywhere
/// Get the last recorded results.
#[allow(dead_code)] // used for testing
pub fn results(&self) -> HashMap<PartitionId, Result<(), String>> {
self.last.lock().expect("not poisoned").clone()
}
@ -31,11 +33,16 @@ impl Display for MockPartitionDoneSink {
#[async_trait]
impl PartitionDoneSink for MockPartitionDoneSink {
async fn record(&self, partition: PartitionId, res: Result<(), DynError>) {
async fn record(
&self,
partition: PartitionId,
res: Result<(), DynError>,
) -> Result<(), DynError> {
self.last
.lock()
.expect("not poisoned")
.insert(partition, res.map_err(|e| e.to_string()));
Ok(())
}
}
@ -54,10 +61,18 @@ mod tests {
assert_eq!(sink.results(), HashMap::default(),);
sink.record(PartitionId::new(1), Err("msg 1".into())).await;
sink.record(PartitionId::new(2), Err("msg 2".into())).await;
sink.record(PartitionId::new(1), Err("msg 3".into())).await;
sink.record(PartitionId::new(3), Ok(())).await;
sink.record(PartitionId::new(1), Err("msg 1".into()))
.await
.expect("record failed");
sink.record(PartitionId::new(2), Err("msg 2".into()))
.await
.expect("record failed");
sink.record(PartitionId::new(1), Err("msg 3".into()))
.await
.expect("record failed");
sink.record(PartitionId::new(3), Ok(()))
.await
.expect("record failed");
assert_eq!(
sink.results(),

View File

@ -6,13 +6,13 @@ use std::{
use async_trait::async_trait;
use data_types::PartitionId;
use crate::error::DynError;
use crate::DynError;
pub mod catalog;
pub mod error_kind;
pub mod logging;
pub mod metrics;
pub mod mock;
pub mod outcome;
/// Records "partition is done" status for given partition.
#[async_trait]
@ -20,7 +20,11 @@ pub trait PartitionDoneSink: Debug + Display + Send + Sync {
/// Record "partition is done" status for given partition.
///
/// This method should retry.
async fn record(&self, partition: PartitionId, res: Result<(), DynError>);
async fn record(
&self,
partition: PartitionId,
res: Result<(), DynError>,
) -> Result<(), DynError>;
}
#[async_trait]
@ -28,7 +32,11 @@ impl<T> PartitionDoneSink for Arc<T>
where
T: PartitionDoneSink + ?Sized,
{
async fn record(&self, partition: PartitionId, res: Result<(), DynError>) {
async fn record(
&self,
partition: PartitionId,
res: Result<(), DynError>,
) -> Result<(), DynError> {
self.as_ref().record(partition, res).await
}
}

View File

@ -0,0 +1,47 @@
use std::{fmt::Display, sync::Arc};
use async_trait::async_trait;
use compactor_scheduler::{
CompactionJob, CompactionJobEnd, CompactionJobEndVariant, Scheduler, SkipReason,
};
use data_types::PartitionId;
use crate::DynError;
use super::PartitionDoneSink;
#[derive(Debug)]
pub struct PartitionDoneSinkToScheduler {
scheduler: Arc<dyn Scheduler>,
}
impl PartitionDoneSinkToScheduler {
pub fn new(scheduler: Arc<dyn Scheduler>) -> Self {
Self { scheduler }
}
}
impl Display for PartitionDoneSinkToScheduler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PartitionDoneSinkToScheduler")
}
}
#[async_trait]
impl PartitionDoneSink for PartitionDoneSinkToScheduler {
async fn record(
&self,
partition: PartitionId,
res: Result<(), DynError>,
) -> Result<(), DynError> {
let end_action = CompactionJobEnd {
job: CompactionJob::new(partition),
end_action: match res {
Ok(_) => CompactionJobEndVariant::Complete,
Err(e) => CompactionJobEndVariant::RequestToSkip(SkipReason(e.to_string())),
},
};
self.scheduler.end_job(end_action).await
}
}

View File

@ -34,7 +34,6 @@ pub fn log_config(config: &Config) {
min_num_l1_files_to_compact,
process_once,
parquet_files_sink_override,
commit_wrapper,
simulate_without_object_store,
all_errors_are_fatal,
max_num_columns_per_table,
@ -47,8 +46,6 @@ pub fn log_config(config: &Config) {
.map(|_| "Some")
.unwrap_or("None");
let commit_wrapper = commit_wrapper.as_ref().map(|_| "Some").unwrap_or("None");
info!(
%catalog,
%scheduler_config,
@ -71,7 +68,6 @@ pub fn log_config(config: &Config) {
process_once,
simulate_without_object_store,
%parquet_files_sink_override,
%commit_wrapper,
all_errors_are_fatal,
max_num_columns_per_table,
max_num_files_per_plan,

View File

@ -8,7 +8,7 @@ use iox_query::exec::Executor;
use iox_time::TimeProvider;
use parquet_file::storage::ParquetStorage;
use crate::components::{commit::CommitWrapper, parquet_files_sink::ParquetFilesSink};
use crate::components::parquet_files_sink::ParquetFilesSink;
/// Multiple from `max_desired_file_size_bytes` to compute the minimum value for
/// `max_compact_size_bytes`. Since `max_desired_file_size_bytes` is softly enforced, actual file
@ -122,11 +122,6 @@ pub struct Config {
/// (used for testing)
pub parquet_files_sink_override: Option<Arc<dyn ParquetFilesSink>>,
/// Optionally wrap the `Commit` instance
///
/// This is mostly used for testing
pub commit_wrapper: Option<Arc<dyn CommitWrapper>>,
/// Ensure that ALL errors (including object store errors) result in "skipped" partitions.
///
/// This is mostly useful for testing.

View File

@ -82,7 +82,7 @@ async fn compact_partition(
scratchpad,
transmit_progress_signal,
)
.await
.await // errors detected in the CompactionJob update_job_status(), will be handled in the timeout_with_progress_checking
}
})
.await;
@ -104,7 +104,9 @@ async fn compact_partition(
// to the `skipped_compactions` table or not.
TimeoutWithProgress::Completed(res) => res,
};
components
// TODO: how handle errors detected in the CompactionJob ending actions?
let _ = components
.partition_done_sink
.record(partition_id, res)
.await;
@ -423,7 +425,7 @@ async fn execute_branch(
created_file_params,
target_level,
)
.await;
.await?;
// we only need to upgrade files on the first iteration, so empty the upgrade list for next loop.
upgrade = Vec::new();
@ -631,7 +633,7 @@ async fn update_catalog(
files_to_upgrade: Vec<ParquetFile>,
file_params_to_create: Vec<ParquetFileParams>,
target_level: CompactionLevel,
) -> (Vec<ParquetFile>, Vec<ParquetFile>) {
) -> Result<(Vec<ParquetFile>, Vec<ParquetFile>), DynError> {
let current_parquet_file_state =
fetch_and_save_parquet_file_state(&components, partition_id).await;
@ -649,7 +651,7 @@ async fn update_catalog(
&file_params_to_create,
target_level,
)
.await;
.await?;
// Update created ids to their corresponding file params
let created_file_params = file_params_to_create
@ -667,7 +669,7 @@ async fn update_catalog(
})
.collect::<Vec<_>>();
(created_file_params, upgraded_files)
Ok((created_file_params, upgraded_files))
}
// SINGLE_THREADED_COLUMN_COUNT is the number of columns requiring a partition be compacted single threaded.

View File

@ -1,5 +1,6 @@
//! Error handling.
use compactor_scheduler::ErrorKind as SchedulerErrorKind;
use datafusion::{arrow::error::ArrowError, error::DataFusionError, parquet::errors::ParquetError};
use object_store::Error as ObjectStoreError;
use std::{error::Error, fmt::Display, sync::Arc};
@ -51,6 +52,17 @@ impl ErrorKind {
}
}
impl From<ErrorKind> for SchedulerErrorKind {
fn from(e: ErrorKind) -> Self {
match e {
ErrorKind::ObjectStore => Self::ObjectStore,
ErrorKind::OutOfMemory => Self::OutOfMemory,
ErrorKind::Timeout => Self::Timeout,
ErrorKind::Unknown => Self::Unknown("".into()),
}
}
}
impl Display for ErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())

View File

@ -221,12 +221,8 @@ mod round_info;
// publically expose items needed for testing
pub use components::{
commit::{Commit, CommitWrapper},
df_planner::panic::PanicDataFusionPlanner,
hardcoded::hardcoded_components,
namespaces_source::mock::NamespaceWrapper,
parquet_files_sink::ParquetFilesSink,
Components,
df_planner::panic::PanicDataFusionPlanner, hardcoded::hardcoded_components,
namespaces_source::mock::NamespaceWrapper, parquet_files_sink::ParquetFilesSink, Components,
};
pub use driver::compact;
pub use error::DynError;

View File

@ -9,13 +9,20 @@ license.workspace = true
async-trait = "0.1.72"
backoff = { path = "../backoff" }
data_types = { path = "../data_types" }
futures = "0.3"
iox_catalog = { path = "../iox_catalog" }
iox_time = { path = "../iox_time" }
itertools = "0.11.0"
metric = { path = "../metric" }
observability_deps = { path = "../observability_deps" }
parking_lot = "0.12.1"
sharder = { path = "../sharder" }
thiserror = "1.0"
uuid = { version = "1", features = ["v4"] }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
assert_matches = "1.5.0"
iox_tests = { path = "../iox_tests" }
test_helpers = { path = "../test_helpers"}
tokio = { version = "1.29", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }

View File

@ -4,10 +4,10 @@ use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
use observability_deps::tracing::info;
use super::Commit;
use super::{Commit, Error};
#[derive(Debug)]
pub struct LoggingCommitWrapper<T>
pub(crate) struct LoggingCommitWrapper<T>
where
T: Commit,
{
@ -18,7 +18,7 @@ impl<T> LoggingCommitWrapper<T>
where
T: Commit,
{
pub fn new(inner: T) -> Self {
pub(crate) fn new(inner: T) -> Self {
Self { inner }
}
}
@ -44,12 +44,12 @@ where
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId> {
) -> Result<Vec<ParquetFileId>, Error> {
// Perform commit first and report status AFTERWARDS.
let created = self
.inner
.commit(partition_id, delete, upgrade, create, target_level)
.await;
.await?;
// Log numbers BEFORE IDs because the list may be so long that we hit the line-length limit. In this case we at
// least have the important information. Note that the message always is printed first, so we'll never loose
@ -72,7 +72,7 @@ where
"committed parquet file change",
);
created
Ok(created)
}
}
@ -80,10 +80,11 @@ where
mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use test_helpers::tracing::TracingCapture;
use super::*;
use crate::components::commit::mock::{CommitHistoryEntry, MockCommit};
use crate::commit::mock::{CommitHistoryEntry, MockCommit};
use iox_tests::ParquetFileBuilder;
#[test]
@ -124,9 +125,9 @@ mod tests {
CompactionLevel::Final,
)
.await;
assert_eq!(
assert_matches!(
ids,
vec![ParquetFileId::new(1000), ParquetFileId::new(1001)]
Ok(res) if res == vec![ParquetFileId::new(1000), ParquetFileId::new(1001)]
);
let ids = commit
@ -138,7 +139,7 @@ mod tests {
CompactionLevel::Final,
)
.await;
assert_eq!(ids, vec![]);
assert_matches!(ids, Ok(res) if res == vec![]);
assert_eq!(
capture.to_string(),

View File

@ -5,7 +5,7 @@ use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams,
use itertools::Itertools;
use metric::{Registry, U64Histogram, U64HistogramOptions};
use super::Commit;
use super::{Commit, Error};
#[derive(Debug, Clone, Copy)]
enum HistogramType {
@ -102,7 +102,7 @@ impl Histogram {
}
#[derive(Debug)]
pub struct MetricsCommitWrapper<T>
pub(crate) struct MetricsCommitWrapper<T>
where
T: Commit,
{
@ -124,7 +124,7 @@ impl<T> MetricsCommitWrapper<T>
where
T: Commit,
{
pub fn new(inner: T, registry: &Registry) -> Self {
pub(crate) fn new(inner: T, registry: &Registry) -> Self {
Self {
file_bytes: Histogram::new(
registry,
@ -182,12 +182,12 @@ where
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId> {
) -> Result<Vec<ParquetFileId>, Error> {
// Perform commit first and report status AFTERWARDS.
let ids = self
.inner
.commit(partition_id, delete, upgrade, create, target_level)
.await;
.await?;
// per file metrics
for f in create {
@ -297,7 +297,7 @@ where
.record(upgrade.iter().map(|f| f.row_count as u64).sum::<u64>());
}
ids
Ok(ids)
}
}
@ -305,9 +305,10 @@ where
mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use metric::{assert_histogram, Attributes};
use crate::components::commit::mock::{CommitHistoryEntry, MockCommit};
use crate::commit::mock::{CommitHistoryEntry, MockCommit};
use iox_tests::ParquetFileBuilder;
use super::*;
@ -398,7 +399,7 @@ mod tests {
CompactionLevel::FileNonOverlapped,
)
.await;
assert_eq!(ids, vec![ParquetFileId::new(1000)]);
assert_matches!(ids, Ok(res) if res == vec![ParquetFileId::new(1000)]);
let ids = commit
.commit(
@ -409,7 +410,7 @@ mod tests {
CompactionLevel::Final,
)
.await;
assert_eq!(ids, vec![]);
assert_matches!(ids, Ok(res) if res == vec![]);
assert_histogram!(
registry,

View File

@ -1,34 +1,32 @@
use std::{
fmt::Display,
sync::{
atomic::{AtomicI64, Ordering},
Mutex,
},
sync::atomic::{AtomicI64, Ordering},
};
use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
use parking_lot::Mutex;
use super::Commit;
use super::{Commit, Error};
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct CommitHistoryEntry {
pub partition_id: PartitionId,
pub delete: Vec<ParquetFile>,
pub upgrade: Vec<ParquetFile>,
pub created: Vec<ParquetFile>,
pub target_level: CompactionLevel,
pub(crate) struct CommitHistoryEntry {
pub(crate) partition_id: PartitionId,
pub(crate) delete: Vec<ParquetFile>,
pub(crate) upgrade: Vec<ParquetFile>,
pub(crate) created: Vec<ParquetFile>,
pub(crate) target_level: CompactionLevel,
}
#[derive(Debug)]
pub struct MockCommit {
#[derive(Debug, Default)]
pub(crate) struct MockCommit {
history: Mutex<Vec<CommitHistoryEntry>>,
id_counter: AtomicI64,
}
impl MockCommit {
#[allow(dead_code)] // not used anywhere
pub fn new() -> Self {
pub(crate) fn new() -> Self {
Self {
history: Default::default(),
id_counter: AtomicI64::new(1000),
@ -36,8 +34,8 @@ impl MockCommit {
}
#[allow(dead_code)] // not used anywhere
pub fn history(&self) -> Vec<CommitHistoryEntry> {
self.history.lock().expect("not poisoned").clone()
pub(crate) fn history(&self) -> Vec<CommitHistoryEntry> {
self.history.lock().clone()
}
}
@ -56,7 +54,7 @@ impl Commit for MockCommit {
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId> {
) -> Result<Vec<ParquetFileId>, Error> {
let (created, ids): (Vec<_>, Vec<_>) = create
.iter()
.map(|params| {
@ -66,23 +64,21 @@ impl Commit for MockCommit {
})
.unzip();
self.history
.lock()
.expect("not poisoned")
.push(CommitHistoryEntry {
partition_id,
delete: delete.to_vec(),
upgrade: upgrade.to_vec(),
created,
target_level,
});
self.history.lock().push(CommitHistoryEntry {
partition_id,
delete: delete.to_vec(),
upgrade: upgrade.to_vec(),
created,
target_level,
});
ids
Ok(ids)
}
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use iox_tests::ParquetFileBuilder;
use super::*;
@ -119,9 +115,9 @@ mod tests {
CompactionLevel::FileNonOverlapped,
)
.await;
assert_eq!(
assert_matches!(
ids,
vec![ParquetFileId::new(1000), ParquetFileId::new(1001)]
Ok(res) if res == vec![ParquetFileId::new(1000), ParquetFileId::new(1001)]
);
let ids = commit
@ -133,7 +129,10 @@ mod tests {
CompactionLevel::Final,
)
.await;
assert_eq!(ids, vec![ParquetFileId::new(1002)]);
assert_matches!(
ids,
Ok(res) if res == vec![ParquetFileId::new(1002)]
);
let ids = commit
.commit(
@ -144,7 +143,10 @@ mod tests {
CompactionLevel::FileNonOverlapped,
)
.await;
assert_eq!(ids, vec![ParquetFileId::new(1003)]);
assert_matches!(
ids,
Ok(res) if res == vec![ParquetFileId::new(1003)]
);
// simulate fill implosion of the file (this may happen w/ delete predicates)
let ids = commit
@ -156,7 +158,10 @@ mod tests {
CompactionLevel::FileNonOverlapped,
)
.await;
assert_eq!(ids, vec![]);
assert_matches!(
ids,
Ok(res) if res == vec![]
);
assert_eq!(
commit.history(),

View File

@ -6,10 +6,25 @@ use std::{
use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
pub mod catalog;
pub mod logging;
pub mod metrics;
pub mod mock;
pub(crate) mod logging;
pub(crate) mod metrics;
pub(crate) mod mock;
/// Error returned by [`Commit`] implementations.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Commit request was malformed
#[error("Bad commit request: {0}")]
BadRequest(String),
/// Commit succeeded, but catalog returned an invalid result
#[error("Result from catalog is invalid: {0}")]
InvalidCatalogResult(String),
/// Commit failed because of an error in the throttler
#[error("Failure in throttler: {0}")]
ThrottlerError(#[from] crate::ThrottleError),
}
/// Ensures that the file change (i.e. deletion and creation) are committed to the catalog.
#[async_trait]
@ -27,7 +42,7 @@ pub trait Commit: Debug + Display + Send + Sync {
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId>;
) -> Result<Vec<ParquetFileId>, crate::commit::Error>;
}
/// Something that can wrap `Commit` instances
@ -50,7 +65,7 @@ where
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId> {
) -> Result<Vec<ParquetFileId>, crate::commit::Error> {
self.as_ref()
.commit(partition_id, delete, upgrade, create, target_level)
.await

View File

@ -0,0 +1,51 @@
use thiserror::Error;
/// Scheduler error.
#[derive(Debug, Error)]
pub(crate) enum Error {
#[error("Commit error: {0}")]
Commit(#[from] crate::commit::Error),
#[error("ThrottleError error: {0}")]
ThrottleError(#[from] crate::ThrottleError),
#[error("UniquePartitions error: {0}")]
UniqueSource(#[from] crate::UniquePartitionsError),
}
/// Compactor Error classification.
/// What kind of error did we occur during compaction?
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum ErrorKind {
/// Could not access the object store.
ObjectStore,
/// We ran out of memory (OOM).
OutOfMemory,
/// Partition took too long.
Timeout,
/// Unknown/unexpected error.
///
/// This will likely mark the affected partition as "skipped" and the compactor will no longer touch it.
Unknown(String),
}
impl ErrorKind {
/// Return static name.
pub fn name(&self) -> &'static str {
match self {
Self::ObjectStore => "object_store",
Self::OutOfMemory => "out_of_memory",
Self::Timeout => "timeout",
Self::Unknown(_) => "unknown",
}
}
}
impl std::fmt::Display for ErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}

View File

@ -26,16 +26,32 @@ use iox_time::TimeProvider;
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
pub(crate) mod commit;
pub(crate) use commit::mock::MockCommit;
pub use commit::{Commit, CommitWrapper, Error as CommitError};
mod error;
pub use error::ErrorKind;
mod local_scheduler;
pub(crate) use local_scheduler::{id_only_partition_filter::IdOnlyPartitionFilter, LocalScheduler};
// configurations used externally during scheduler setup
#[allow(unused_imports)] // for testing
pub(crate) use local_scheduler::partition_done_sink::mock::MockPartitionDoneSink;
pub use local_scheduler::{
combos::throttle_partition::Error as ThrottleError,
partitions_source_config::PartitionsSourceConfig, shard_config::ShardConfig,
LocalSchedulerConfig,
};
pub(crate) use local_scheduler::{
combos::unique_partitions::Error as UniquePartitionsError,
id_only_partition_filter::IdOnlyPartitionFilter,
partition_done_sink::Error as PartitionDoneSinkError, partition_done_sink::PartitionDoneSink,
LocalScheduler,
};
// partitions_source trait
mod partitions_source;
pub use partitions_source::*;
// scheduler trait and associated types
mod scheduler;
pub use scheduler::*;
@ -49,6 +65,8 @@ pub fn create_scheduler(
config: SchedulerConfig,
catalog: Arc<dyn Catalog>,
time_provider: Arc<dyn TimeProvider>,
metrics: Arc<metric::Registry>,
shadow_mode: bool,
) -> Arc<dyn Scheduler> {
match config {
SchedulerConfig::Local(scheduler_config) => {
@ -57,6 +75,8 @@ pub fn create_scheduler(
BackoffConfig::default(),
catalog,
time_provider,
metrics,
shadow_mode,
);
Arc::new(scheduler)
}
@ -75,13 +95,20 @@ pub fn create_test_scheduler(
let scheduler_config = match mocked_partition_ids {
None => SchedulerConfig::default(),
Some(partition_ids) => SchedulerConfig::Local(LocalSchedulerConfig {
commit_wrapper: None,
partitions_source_config: PartitionsSourceConfig::Fixed(
partition_ids.into_iter().collect::<HashSet<PartitionId>>(),
),
shard_config: None,
}),
};
create_scheduler(scheduler_config, catalog, time_provider)
create_scheduler(
scheduler_config,
catalog,
time_provider,
Arc::new(metric::Registry::default()),
false,
)
}
#[cfg(test)]

View File

@ -1,26 +1,37 @@
//! Internals used by [`LocalScheduler`].
pub(crate) mod catalog_commit;
pub(crate) mod combos;
pub(crate) mod id_only_partition_filter;
pub(crate) mod partition_done_sink;
pub(crate) mod partitions_source;
pub(crate) mod partitions_source_config;
pub(crate) mod shard_config;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use backoff::BackoffConfig;
use iox_catalog::interface::Catalog;
use iox_time::TimeProvider;
use observability_deps::tracing::info;
use observability_deps::tracing::{info, warn};
use crate::{
CompactionJob, MockPartitionsSource, PartitionsSource, PartitionsSourceConfig, Scheduler,
ShardConfig,
commit::{logging::LoggingCommitWrapper, metrics::MetricsCommitWrapper},
Commit, CommitUpdate, CommitWrapper, CompactionJob, CompactionJobEnd, CompactionJobEndVariant,
CompactionJobStatus, CompactionJobStatusResponse, CompactionJobStatusVariant, MockCommit,
MockPartitionsSource, PartitionsSource, PartitionsSourceConfig, Scheduler, ShardConfig,
SkipReason,
};
use self::{
catalog_commit::CatalogCommit,
combos::{throttle_partition::throttle_partition, unique_partitions::unique_partitions},
id_only_partition_filter::{
and::AndIdOnlyPartitionFilter, shard::ShardPartitionFilter, IdOnlyPartitionFilter,
},
partition_done_sink::{
catalog::CatalogPartitionDoneSink, mock::MockPartitionDoneSink, PartitionDoneSink,
},
partitions_source::{
catalog_all::CatalogAllPartitionsSource,
catalog_to_compact::CatalogToCompactPartitionsSource,
@ -31,6 +42,10 @@ use self::{
/// Configuration specific to the local scheduler.
#[derive(Debug, Default, Clone)]
pub struct LocalSchedulerConfig {
/// Optionally wrap the `Commit` instance
///
/// This is mostly used for testing
pub commit_wrapper: Option<Arc<dyn CommitWrapper>>,
/// The partitions source config used by the local sceduler.
pub partitions_source_config: PartitionsSourceConfig,
/// The shard config used by the local sceduler.
@ -40,8 +55,14 @@ pub struct LocalSchedulerConfig {
/// Implementation of the scheduler for local (per compactor) scheduling.
#[derive(Debug)]
pub(crate) struct LocalScheduler {
/// Commits changes (i.e. deletion and creation) to the catalog
pub(crate) commit: Arc<dyn Commit>,
/// The partitions source to use for scheduling.
partitions_source: Arc<dyn PartitionsSource>,
/// The actions to take when a partition is done.
///
/// Includes partition (PartitionId) tracking of uniqueness and throttling.
partition_done_sink: Arc<dyn PartitionDoneSink>,
/// The shard config used for generating the PartitionsSource.
shard_config: Option<ShardConfig>,
}
@ -53,7 +74,47 @@ impl LocalScheduler {
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
time_provider: Arc<dyn TimeProvider>,
metrics: Arc<metric::Registry>,
shadow_mode: bool,
) -> Self {
let commit = Self::build_commit(
config.clone(),
backoff_config.clone(),
Arc::clone(&catalog),
metrics,
shadow_mode,
);
let partitions_source = Self::build_partitions_source(
config.clone(),
backoff_config.clone(),
Arc::clone(&catalog),
Arc::clone(&time_provider),
);
let (partitions_source, commit, partition_done_sink) = Self::build_partition_done_sink(
partitions_source,
commit,
backoff_config,
catalog,
time_provider,
shadow_mode,
);
Self {
commit,
partitions_source,
partition_done_sink,
shard_config: config.shard_config,
}
}
fn build_partitions_source(
config: LocalSchedulerConfig,
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
time_provider: Arc<dyn TimeProvider>,
) -> Arc<dyn PartitionsSource> {
let shard_config = config.shard_config;
let partitions_source: Arc<dyn PartitionsSource> = match &config.partitions_source_config {
PartitionsSourceConfig::CatalogRecentWrites { threshold } => {
@ -86,16 +147,75 @@ impl LocalScheduler {
shard_config.shard_id,
)));
}
let partitions_source: Arc<dyn PartitionsSource> =
Arc::new(FilterPartitionsSourceWrapper::new(
AndIdOnlyPartitionFilter::new(id_only_partition_filters),
partitions_source,
));
Self {
Arc::new(FilterPartitionsSourceWrapper::new(
AndIdOnlyPartitionFilter::new(id_only_partition_filters),
partitions_source,
shard_config,
}
))
}
fn build_partition_done_sink(
partitions_source: Arc<dyn PartitionsSource>,
commit: Arc<dyn Commit>,
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
time_provider: Arc<dyn TimeProvider>,
shadow_mode: bool,
) -> (
Arc<dyn PartitionsSource>,
Arc<dyn Commit>,
Arc<dyn PartitionDoneSink>,
) {
let partition_done_sink: Arc<dyn PartitionDoneSink> = if shadow_mode {
Arc::new(MockPartitionDoneSink::new())
} else {
Arc::new(CatalogPartitionDoneSink::new(
backoff_config,
Arc::clone(&catalog),
))
};
let (partitions_source, partition_done_sink) =
unique_partitions(partitions_source, partition_done_sink, 1);
let (partitions_source, commit, partition_done_sink) = throttle_partition(
partitions_source,
commit,
partition_done_sink,
Arc::clone(&time_provider),
Duration::from_secs(60),
1,
);
(
Arc::new(partitions_source),
Arc::new(commit),
Arc::new(partition_done_sink),
)
}
fn build_commit(
config: LocalSchedulerConfig,
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
metrics_registry: Arc<metric::Registry>,
shadow_mode: bool,
) -> Arc<dyn Commit> {
let commit: Arc<dyn Commit> = if shadow_mode {
Arc::new(MockCommit::new())
} else {
Arc::new(CatalogCommit::new(backoff_config, Arc::clone(&catalog)))
};
let commit = if let Some(commit_wrapper) = &config.commit_wrapper {
commit_wrapper.wrap(commit)
} else {
commit
};
Arc::new(LoggingCommitWrapper::new(MetricsCommitWrapper::new(
commit,
&metrics_registry,
)))
}
}
@ -109,6 +229,57 @@ impl Scheduler for LocalScheduler {
.map(CompactionJob::new)
.collect()
}
async fn update_job_status(
&self,
job_status: CompactionJobStatus,
) -> Result<CompactionJobStatusResponse, Box<dyn std::error::Error + Send + Sync>> {
match job_status.status {
CompactionJobStatusVariant::Update(commit_update) => {
let CommitUpdate {
partition_id,
delete,
upgrade,
target_level,
create,
} = commit_update;
let result = self
.commit
.commit(partition_id, &delete, &upgrade, &create, target_level)
.await?;
Ok(CompactionJobStatusResponse::CreatedParquetFiles(result))
}
CompactionJobStatusVariant::Error(error_kind) => {
warn!("Error processing job: {:?}: {}", job_status.job, error_kind);
Ok(CompactionJobStatusResponse::Ack)
}
}
}
async fn end_job(
&self,
end: CompactionJobEnd,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match end.end_action {
CompactionJobEndVariant::RequestToSkip(SkipReason(msg)) => {
self.partition_done_sink
.record(end.job.partition_id, Err(msg.into()))
.await?;
Ok(())
}
CompactionJobEndVariant::Complete => {
self.partition_done_sink
.record(end.job.partition_id, Ok(()))
.await?;
// TODO: once uuid is handled properly, we can track the job completion
Ok(())
}
}
}
}
impl std::fmt::Display for LocalScheduler {
@ -134,6 +305,8 @@ mod tests {
BackoffConfig::default(),
TestCatalog::new().catalog(),
Arc::new(MockProvider::new(Time::MIN)),
Arc::new(metric::Registry::default()),
false,
);
assert_eq!(scheduler.to_string(), "local_compaction_scheduler",);
@ -147,6 +320,7 @@ mod tests {
});
let config = LocalSchedulerConfig {
commit_wrapper: None,
partitions_source_config: PartitionsSourceConfig::default(),
shard_config,
};
@ -156,6 +330,8 @@ mod tests {
BackoffConfig::default(),
TestCatalog::new().catalog(),
Arc::new(MockProvider::new(Time::MIN)),
Arc::new(metric::Registry::default()),
false,
);
assert_eq!(

View File

@ -5,16 +5,16 @@ use backoff::{Backoff, BackoffConfig};
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
use iox_catalog::interface::Catalog;
use super::Commit;
use crate::{commit::Error, Commit};
#[derive(Debug)]
pub struct CatalogCommit {
pub(crate) struct CatalogCommit {
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
}
impl CatalogCommit {
pub fn new(backoff_config: BackoffConfig, catalog: Arc<dyn Catalog>) -> Self {
pub(crate) fn new(backoff_config: BackoffConfig, catalog: Arc<dyn Catalog>) -> Self {
Self {
backoff_config,
catalog,
@ -37,13 +37,27 @@ impl Commit for CatalogCommit {
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId> {
assert!(!upgrade.is_empty() || (!delete.is_empty() && !create.is_empty()));
) -> Result<Vec<ParquetFileId>, Error> {
let is_upgrade_commit = !upgrade.is_empty();
let is_replacement_commit = !delete.is_empty() || !create.is_empty();
let replacement_commit_is_ok = !delete.is_empty() && !create.is_empty();
match (is_upgrade_commit, is_replacement_commit) {
(false, false) => {
return Err(Error::BadRequest("commit must have files to upgrade, and/or a set of files to replace (delete and create)".into()));
}
(_, true) if !replacement_commit_is_ok => {
return Err(Error::BadRequest(
"replacement commits must have both files to delete and files to create".into(),
));
}
_ => {} // is ok
}
let delete = delete.iter().map(|f| f.id).collect::<Vec<_>>();
let upgrade = upgrade.iter().map(|f| f.id).collect::<Vec<_>>();
Backoff::new(&self.backoff_config)
let result = Backoff::new(&self.backoff_config)
.retry_all_errors("commit parquet file changes", || async {
let mut repos = self.catalog.repositories().await;
let parquet_files = repos.parquet_files();
@ -54,6 +68,16 @@ impl Commit for CatalogCommit {
Ok::<_, iox_catalog::interface::Error>(ids)
})
.await
.expect("retry forever")
.expect("retry forever");
if result.len() != create.len() {
return Err(Error::InvalidCatalogResult(format!(
"Number of created parquet files is invalid: expected {} but found {}",
create.len(),
result.len()
)));
}
return Ok(result);
}
}

View File

@ -1,7 +1,7 @@
//! Combinations of multiple components that together can achieve one goal.
pub mod throttle_partition;
pub mod unique_partitions;
pub(crate) mod throttle_partition;
pub(crate) mod unique_partitions;
#[cfg(test)]
mod tests;

View File

@ -1,15 +1,15 @@
use std::{sync::Arc, time::Duration};
use compactor_scheduler::{MockPartitionsSource, PartitionsSource};
use data_types::{CompactionLevel, PartitionId};
use iox_time::{MockProvider, Time};
use crate::components::{
combos::{throttle_partition::throttle_partition, unique_partitions::unique_partitions},
commit::{mock::MockCommit, Commit},
partition_done_sink::{mock::MockPartitionDoneSink, PartitionDoneSink},
use crate::{
Commit, MockCommit, MockPartitionDoneSink, MockPartitionsSource, PartitionDoneSink,
PartitionsSource,
};
use super::{throttle_partition::throttle_partition, unique_partitions::unique_partitions};
#[tokio::test]
async fn test_unique_and_throttle() {
let inner_source = Arc::new(MockPartitionsSource::new(vec![
@ -44,9 +44,14 @@ async fn test_unique_and_throttle() {
commit
.commit(PartitionId::new(1), &[], &[], &[], CompactionLevel::Initial)
.await;
sink.record(PartitionId::new(1), Ok(())).await;
sink.record(PartitionId::new(2), Ok(())).await;
.await
.expect("commit failed");
sink.record(PartitionId::new(1), Ok(()))
.await
.expect("record failed");
sink.record(PartitionId::new(2), Ok(()))
.await
.expect("record failed");
inner_source.set(vec![
PartitionId::new(1),

View File

@ -1,19 +1,17 @@
//! Throttle partions that receive no commits.
use std::{
collections::HashMap,
fmt::Display,
sync::{Arc, Mutex},
time::Duration,
};
use std::{collections::HashMap, fmt::Display, sync::Arc, time::Duration};
use async_trait::async_trait;
use compactor_scheduler::PartitionsSource;
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use iox_time::{Time, TimeProvider};
use parking_lot::Mutex;
use crate::components::{commit::Commit, partition_done_sink::PartitionDoneSink};
use crate::{
local_scheduler::partition_done_sink::DynError, Commit, CommitError, PartitionDoneSink,
PartitionDoneSinkError, PartitionsSource,
};
/// Ensures that partitions that do not receive any commits are throttled.
///
@ -54,8 +52,8 @@ use crate::components::{commit::Commit, partition_done_sink::PartitionDoneSink};
/// concurrency of this bypass can be controlled via `bypass_concurrency`.
///
/// This setup relies on a fact that it does not process duplicate [`PartitionId`]. You may use
/// [`unique_partitions`](crate::components::combos::unique_partitions::unique_partitions) to achieve that.
pub fn throttle_partition<T1, T2, T3>(
/// [`unique_partitions`](super::unique_partitions::unique_partitions) to achieve that.
pub(crate) fn throttle_partition<T1, T2, T3>(
source: T1,
commit: T2,
sink: T3,
@ -94,6 +92,14 @@ where
(source, commit, sink)
}
/// Error returned by throttler abstractions.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Failed uniqueness check.
#[error("Unknown or already done partition: {0}")]
Uniqueness(PartitionId),
}
#[derive(Debug, Default)]
struct State {
// Value is "true" while compaction task is in-flight, and "false" once complete.
@ -107,7 +113,7 @@ struct State {
type SharedState = Arc<Mutex<State>>;
#[derive(Debug)]
pub struct ThrottlePartitionsSourceWrapper<T1, T2>
pub(crate) struct ThrottlePartitionsSourceWrapper<T1, T2>
where
T1: PartitionsSource,
T2: PartitionDoneSink,
@ -139,7 +145,7 @@ where
let res = self.inner_source.fetch().await;
let (pass, throttle) = {
let mut guard = self.state.lock().expect("not poisoned");
let mut guard = self.state.lock();
// ensure that in-flight data is non-overlapping
for id in &res {
@ -177,10 +183,11 @@ where
(pass, throttle)
};
futures::stream::iter(throttle)
// pass through the removal from tracking, since it failed in this fetch() call
let _ = futures::stream::iter(throttle)
.map(|id| self.inner_sink.record(id, Ok(())))
.buffer_unordered(self.sink_concurrency)
.collect::<()>()
.try_collect::<()>()
.await;
pass
@ -188,7 +195,7 @@ where
}
#[derive(Debug)]
pub struct ThrottleCommitWrapper<T>
pub(crate) struct ThrottleCommitWrapper<T>
where
T: Commit,
{
@ -217,9 +224,9 @@ where
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId> {
) -> Result<Vec<ParquetFileId>, CommitError> {
let known = {
let mut guard = self.state.lock().expect("not poisoned");
let mut guard = self.state.lock();
match guard.in_flight.get_mut(&partition_id) {
Some(val) => {
*val = true;
@ -229,10 +236,9 @@ where
}
};
// perform check when NOT holding the mutex to not poison it
assert!(
known,
"Unknown or already done partition in commit: {partition_id}"
);
if !known {
return Err(Error::Uniqueness(partition_id).into());
}
self.inner
.commit(partition_id, delete, upgrade, create, target_level)
@ -241,7 +247,7 @@ where
}
#[derive(Debug)]
pub struct ThrottlePartitionDoneSinkWrapper<T>
pub(crate) struct ThrottlePartitionDoneSinkWrapper<T>
where
T: PartitionDoneSink,
{
@ -268,10 +274,10 @@ where
async fn record(
&self,
partition: PartitionId,
res: Result<(), Box<dyn std::error::Error + Send + Sync>>,
) {
res: Result<(), DynError>,
) -> Result<(), PartitionDoneSinkError> {
let known = {
let mut guard = self.state.lock().expect("not poisoned");
let mut guard = self.state.lock();
match guard.in_flight.remove(&partition) {
Some(val) => {
if !val {
@ -285,23 +291,21 @@ where
}
};
// perform check when NOT holding the mutex to not poison it
assert!(
known,
"Unknown or already done partition in partition done sink: {partition}"
);
if !known {
return Err(Error::Uniqueness(partition).into());
}
self.inner.record(partition, res).await;
self.inner.record(partition, res).await
}
}
#[cfg(test)]
mod tests {
use compactor_scheduler::MockPartitionsSource;
use assert_matches::assert_matches;
use iox_time::MockProvider;
use crate::components::{
commit::mock::{CommitHistoryEntry, MockCommit},
partition_done_sink::mock::MockPartitionDoneSink,
use crate::{
commit::mock::CommitHistoryEntry, MockCommit, MockPartitionDoneSink, MockPartitionsSource,
};
use super::*;
@ -357,14 +361,20 @@ mod tests {
// commit
commit
.commit(PartitionId::new(1), &[], &[], &[], CompactionLevel::Initial)
.await;
.await
.expect("commit failed");
commit
.commit(PartitionId::new(2), &[], &[], &[], CompactionLevel::Initial)
.await;
.await
.expect("commit failed");
// record
sink.record(PartitionId::new(1), Ok(())).await;
sink.record(PartitionId::new(3), Ok(())).await;
sink.record(PartitionId::new(1), Ok(()))
.await
.expect("record failed");
sink.record(PartitionId::new(3), Ok(()))
.await
.expect("record failed");
assert_eq!(
inner_sink.results(),
HashMap::from([(PartitionId::new(1), Ok(())), (PartitionId::new(3), Ok(())),]),
@ -405,9 +415,11 @@ mod tests {
// record
// can still finish partition 2 and 4
sink.record(PartitionId::new(2), Err(String::from("foo").into()))
.await;
.await
.expect("record failed");
sink.record(PartitionId::new(4), Err(String::from("bar").into()))
.await;
.await
.expect("record failed");
assert_eq!(
inner_sink.results(),
HashMap::from([
@ -457,8 +469,7 @@ mod tests {
}
#[tokio::test]
#[should_panic(expected = "Unknown or already done partition in commit: 1")]
async fn test_panic_commit_unknown() {
async fn test_err_commit_unknown() {
let (source, commit, sink) = throttle_partition(
MockPartitionsSource::new(vec![PartitionId::new(1)]),
MockCommit::new(),
@ -469,14 +480,18 @@ mod tests {
);
source.fetch().await;
sink.record(PartitionId::new(1), Ok(())).await;
commit
sink.record(PartitionId::new(1), Ok(()))
.await
.expect("record failed");
assert_matches!(
commit
.commit(PartitionId::new(1), &[], &[], &[], CompactionLevel::Initial)
.await;
.await,
Err(CommitError::ThrottlerError(Error::Uniqueness(res))) if res == PartitionId::new(1)
);
}
#[tokio::test]
#[should_panic(expected = "Unknown or already done partition in partition done sink: 1")]
async fn test_panic_sink_unknown() {
let (source, _commit, sink) = throttle_partition(
MockPartitionsSource::new(vec![PartitionId::new(1)]),
@ -488,10 +503,20 @@ mod tests {
);
source.fetch().await;
sink.record(PartitionId::new(1), Ok(())).await;
sink.record(PartitionId::new(1), Ok(())).await;
sink.record(PartitionId::new(1), Ok(()))
.await
.expect("record failed");
let err = sink.record(PartitionId::new(1), Ok(())).await;
assert_matches!(
err,
Err(PartitionDoneSinkError::Throttler(Error::Uniqueness(partition))) if partition == PartitionId::new(1),
"fails because partition 1 is already done"
);
}
// TODO: remove last legacy panic from scheduler
// Requires change of Scheduler.get_jobs() API to a Result<Vec<CompactionJob>>
#[tokio::test]
#[should_panic(expected = "Partition already in-flight: 1")]
async fn test_panic_duplicate_in_flight() {

View File

@ -1,17 +1,13 @@
//! Ensure that partitions flowing through the pipeline are unique.
use std::{
collections::HashSet,
fmt::Display,
sync::{Arc, Mutex},
};
use std::{collections::HashSet, fmt::Display, sync::Arc};
use async_trait::async_trait;
use compactor_scheduler::PartitionsSource;
use data_types::PartitionId;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use parking_lot::Mutex;
use crate::components::partition_done_sink::PartitionDoneSink;
use crate::{local_scheduler::partition_done_sink::DynError, PartitionDoneSink, PartitionsSource};
/// Ensures that a unique set of partitions is flowing through the critical section of the compactor pipeline.
///
@ -32,7 +28,7 @@ use crate::components::partition_done_sink::PartitionDoneSink;
///
/// | Step | Name | Type | Description |
/// | ---- | --------------------- | ----------------------------------------------------------- | ----------- |
/// | 1 | **Actual source** | `inner_source`/`T1`/[`PartitionsSource`], wrapped | This is the actual source, e.g. a [schedule](crate::components::partitions_source::scheduled::ScheduledPartitionsSource) |
/// | 1 | **Actual source** | `inner_source`/`T1`/[`PartitionsSource`], wrapped | This is the actual source, e.g. a [schedule](crate::PartitionsSource) |
/// | 2 | **Unique IDs source** | [`UniquePartionsSourceWrapper`], wraps `inner_source`/`T1` | Outputs that [`PartitionId`]s from the `inner_source` but filters out partitions that have not yet reached the uniqueness sink (step 4) |
/// | 3 | **Critical section** | -- | Here it is always ensured that a single [`PartitionId`] does NOT occur more than once. |
/// | 4 | **Unique IDs sink** | [`UniquePartitionDoneSinkWrapper`], wraps `inner_sink`/`T2` | Observes incoming IDs and removes them from the filter applied in step 2. |
@ -41,7 +37,7 @@ use crate::components::partition_done_sink::PartitionDoneSink;
/// Note that partitions filtered out by [`UniquePartionsSourceWrapper`] will directly be forwarded to `inner_sink`. No
/// partition is ever lost. This means that `inner_source` and `inner_sink` can perform proper accounting. The
/// concurrency of this bypass can be controlled via `bypass_concurrency`.
pub fn unique_partitions<T1, T2>(
pub(crate) fn unique_partitions<T1, T2>(
inner_source: T1,
inner_sink: T2,
bypass_concurrency: usize,
@ -68,10 +64,18 @@ where
(source, sink)
}
/// Error returned by uniqueness abstractions.
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
/// Failed uniqueness check.
#[error("Unknown or already done partition: {0}")]
Uniqueness(PartitionId),
}
type InFlight = Arc<Mutex<HashSet<PartitionId>>>;
#[derive(Debug)]
pub struct UniquePartionsSourceWrapper<T1, T2>
pub(crate) struct UniquePartionsSourceWrapper<T1, T2>
where
T1: PartitionsSource,
T2: PartitionDoneSink,
@ -102,7 +106,7 @@ where
let res = self.inner_source.fetch().await;
let (unique, duplicates) = {
let mut guard = self.in_flight.lock().expect("not poisoned");
let mut guard = self.in_flight.lock();
let mut unique = Vec::with_capacity(res.len());
let mut duplicates = Vec::with_capacity(res.len());
@ -117,10 +121,11 @@ where
(unique, duplicates)
};
futures::stream::iter(duplicates)
// pass through the removal from tracking, since it was marked as duplicate in this fetch() call
let _ = futures::stream::iter(duplicates)
.map(|id| self.inner_sink.record(id, Ok(())))
.buffer_unordered(self.sink_concurrency)
.collect::<()>()
.try_collect::<()>()
.await;
unique
@ -128,7 +133,7 @@ where
}
#[derive(Debug)]
pub struct UniquePartitionDoneSinkWrapper<T>
pub(crate) struct UniquePartitionDoneSinkWrapper<T>
where
T: PartitionDoneSink,
{
@ -153,17 +158,16 @@ where
async fn record(
&self,
partition: PartitionId,
res: Result<(), Box<dyn std::error::Error + Send + Sync>>,
) {
res: Result<(), DynError>,
) -> Result<(), crate::PartitionDoneSinkError> {
let existing = {
let mut guard = self.in_flight.lock().expect("not poisoned");
let mut guard = self.in_flight.lock();
guard.remove(&partition)
};
// perform check when NOT holding the mutex to not poison it
assert!(
existing,
"Unknown or already done partition in sink: {partition}"
);
if !existing {
return Err(Error::Uniqueness(partition).into());
}
// perform inner last, because the wrapping order is:
//
@ -172,7 +176,7 @@ where
// - ...
// - unique sink
// - wrapped sink
self.inner.record(partition, res).await;
self.inner.record(partition, res).await
}
}
@ -180,9 +184,9 @@ where
mod tests {
use std::collections::HashMap;
use compactor_scheduler::MockPartitionsSource;
use assert_matches::assert_matches;
use crate::components::partition_done_sink::mock::MockPartitionDoneSink;
use crate::{MockPartitionDoneSink, MockPartitionsSource, PartitionDoneSinkError};
use super::*;
@ -227,8 +231,12 @@ mod tests {
);
// record
sink.record(PartitionId::new(1), Ok(())).await;
sink.record(PartitionId::new(2), Ok(())).await;
sink.record(PartitionId::new(1), Ok(()))
.await
.expect("record failed");
sink.record(PartitionId::new(2), Ok(()))
.await
.expect("record failed");
assert_eq!(
inner_sink.results(),
@ -264,7 +272,8 @@ mod tests {
// record
sink.record(PartitionId::new(1), Err(String::from("foo").into()))
.await;
.await
.expect("record failed");
assert_eq!(
inner_sink.results(),
@ -291,7 +300,6 @@ mod tests {
}
#[tokio::test]
#[should_panic(expected = "Unknown or already done partition in sink: 1")]
async fn test_panic_sink_unknown() {
let (source, sink) = unique_partitions(
MockPartitionsSource::new(vec![PartitionId::new(1)]),
@ -301,7 +309,11 @@ mod tests {
let ids = source.fetch().await;
assert_eq!(ids.len(), 1);
let id = ids[0];
sink.record(id, Ok(())).await;
sink.record(id, Ok(())).await;
sink.record(id, Ok(())).await.expect("record failed");
assert_matches!(
sink.record(id, Ok(())).await,
Err(PartitionDoneSinkError::UniquePartitions(Error::Uniqueness(partition))) if partition == PartitionId::new(1),
"fails because partition 1 is already done"
);
}
}

View File

@ -5,18 +5,16 @@ use backoff::{Backoff, BackoffConfig};
use data_types::PartitionId;
use iox_catalog::interface::Catalog;
use crate::error::DynError;
use super::PartitionDoneSink;
use super::{DynError, Error, PartitionDoneSink};
#[derive(Debug)]
pub struct CatalogPartitionDoneSink {
pub(crate) struct CatalogPartitionDoneSink {
backoff_config: BackoffConfig,
catalog: Arc<dyn Catalog>,
}
impl CatalogPartitionDoneSink {
pub fn new(backoff_config: BackoffConfig, catalog: Arc<dyn Catalog>) -> Self {
pub(crate) fn new(backoff_config: BackoffConfig, catalog: Arc<dyn Catalog>) -> Self {
Self {
backoff_config,
catalog,
@ -32,7 +30,7 @@ impl Display for CatalogPartitionDoneSink {
#[async_trait]
impl PartitionDoneSink for CatalogPartitionDoneSink {
async fn record(&self, partition: PartitionId, res: Result<(), DynError>) {
async fn record(&self, partition: PartitionId, res: Result<(), DynError>) -> Result<(), Error> {
if let Err(e) = res {
let msg = e.to_string();
@ -47,7 +45,8 @@ impl PartitionDoneSink for CatalogPartitionDoneSink {
.await
})
.await
.expect("retry forever");
.map_err(|e| Error::Catalog(e.to_string()))?;
}
Ok(())
}
}

View File

@ -0,0 +1,81 @@
use std::{collections::HashMap, fmt::Display};
use async_trait::async_trait;
use data_types::PartitionId;
use parking_lot::Mutex;
use super::{DynError, Error, PartitionDoneSink};
/// Mock for [`PartitionDoneSink`].
#[derive(Debug, Default)]
pub(crate) struct MockPartitionDoneSink {
last: Mutex<HashMap<PartitionId, Result<(), String>>>,
}
impl MockPartitionDoneSink {
/// Create new mock.
pub(crate) fn new() -> Self {
Self::default()
}
/// Get the last recorded results.
#[allow(dead_code)] // used for testing
pub(crate) fn results(&self) -> HashMap<PartitionId, Result<(), String>> {
self.last.lock().clone()
}
}
impl Display for MockPartitionDoneSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "mock")
}
}
#[async_trait]
impl PartitionDoneSink for MockPartitionDoneSink {
async fn record(&self, partition: PartitionId, res: Result<(), DynError>) -> Result<(), Error> {
self.last
.lock()
.insert(partition, res.map_err(|e| e.to_string()));
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_display() {
assert_eq!(MockPartitionDoneSink::new().to_string(), "mock",);
}
#[tokio::test]
async fn test_record() {
let sink = MockPartitionDoneSink::new();
assert_eq!(sink.results(), HashMap::default(),);
sink.record(PartitionId::new(1), Err("msg 1".into()))
.await
.expect("record failed");
sink.record(PartitionId::new(2), Err("msg 2".into()))
.await
.expect("record failed");
sink.record(PartitionId::new(1), Err("msg 3".into()))
.await
.expect("record failed");
sink.record(PartitionId::new(3), Ok(()))
.await
.expect("record failed");
assert_eq!(
sink.results(),
HashMap::from([
(PartitionId::new(1), Err(String::from("msg 3"))),
(PartitionId::new(2), Err(String::from("msg 2"))),
(PartitionId::new(3), Ok(())),
]),
);
}
}

View File

@ -0,0 +1,48 @@
pub(crate) mod catalog;
pub(crate) mod mock;
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use async_trait::async_trait;
use data_types::PartitionId;
/// Dynamic error type that is provided from the Compactor.
pub(crate) type DynError = Box<dyn std::error::Error + Send + Sync>;
/// Error returned by [`PartitionDoneSink`] implementations.
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
/// PartitionDoneSink failed during connection with catalog
#[error("Failure during catalog communication: {0}")]
Catalog(String),
/// PartitionDoneSink failed because of an error in the unique partitions source wrapper
#[error("Failure in unique_partitions: {0}")]
UniquePartitions(#[from] crate::UniquePartitionsError),
/// PartitionDoneSink failed because of an error in the throttler
#[error("Failure in throttler: {0}")]
Throttler(#[from] crate::ThrottleError),
}
/// Records "partition is done" status for given partition.
#[async_trait]
pub(crate) trait PartitionDoneSink: Debug + Display + Send + Sync {
/// Record "partition is done" status for given partition.
///
/// This method should retry.
async fn record(&self, partition: PartitionId, res: Result<(), DynError>) -> Result<(), Error>;
}
#[async_trait]
impl<T> PartitionDoneSink for Arc<T>
where
T: PartitionDoneSink + ?Sized,
{
async fn record(&self, partition: PartitionId, res: Result<(), DynError>) -> Result<(), Error> {
self.as_ref().record(partition, res).await
}
}

View File

@ -1,4 +1,4 @@
use std::{fmt::Display, sync::Arc, sync::Mutex, time::Duration};
use std::{fmt::Display, sync::Arc, time::Duration};
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
@ -6,6 +6,7 @@ use data_types::PartitionId;
use iox_catalog::interface::Catalog;
use iox_time::{Time, TimeProvider};
use observability_deps::tracing::info;
use parking_lot::Mutex;
use crate::PartitionsSource;
@ -73,7 +74,7 @@ impl PartitionsSource for CatalogToCompactPartitionsSource {
// scope the locking to just maintenance of last_maximum_time, not the query
{
// we're going check the time range we'd like to query for against the end time of the last query.
let mut last = self.last_maximum_time.lock().unwrap();
let mut last = self.last_maximum_time.lock();
// query for partitions with activity since the last query. We shouldn't query for a time range
// we've already covered. So if the prior query was 2m ago, and the query covered 10m, ending at

View File

@ -1,10 +1,11 @@
use std::{
fmt::{Debug, Display},
sync::{Arc, Mutex},
sync::Arc,
};
use async_trait::async_trait;
use data_types::PartitionId;
use parking_lot::Mutex;
/// A source of partitions, noted by [`PartitionId`](data_types::PartitionId), that may potentially need compacting.
#[async_trait]
@ -48,7 +49,7 @@ mod mock {
/// Set PartitionIds for MockPartitionsSource.
#[allow(dead_code)] // not used anywhere
pub fn set(&self, partitions: Vec<PartitionId>) {
*self.partitions.lock().expect("not poisoned") = partitions;
*self.partitions.lock() = partitions;
}
}
@ -61,7 +62,7 @@ mod mock {
#[async_trait]
impl PartitionsSource for MockPartitionsSource {
async fn fetch(&self) -> Vec<PartitionId> {
self.partitions.lock().expect("not poisoned").clone()
self.partitions.lock().clone()
}
}
}

View File

@ -1,10 +1,13 @@
use std::fmt::{Debug, Display};
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use async_trait::async_trait;
use data_types::PartitionId;
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
use uuid::Uuid;
use crate::LocalSchedulerConfig;
use crate::{CommitWrapper, ErrorKind, LocalSchedulerConfig, PartitionsSourceConfig};
/// Scheduler configuration.
#[derive(Debug, Clone)]
@ -13,6 +16,19 @@ pub enum SchedulerConfig {
Local(LocalSchedulerConfig),
}
impl SchedulerConfig {
/// Create new [`LocalScheduler`](crate::LocalScheduler) config with a [`CommitWrapper`].
///
/// This is useful for testing.
pub fn new_local_with_wrapper(commit_wrapper: Arc<dyn CommitWrapper>) -> Self {
Self::Local(LocalSchedulerConfig {
shard_config: None,
partitions_source_config: PartitionsSourceConfig::default(),
commit_wrapper: Some(commit_wrapper),
})
}
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self::Local(LocalSchedulerConfig::default())
@ -23,18 +39,28 @@ impl std::fmt::Display for SchedulerConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SchedulerConfig::Local(LocalSchedulerConfig {
commit_wrapper,
shard_config,
partitions_source_config: _,
}) => match &shard_config {
None => write!(f, "local_compaction_scheduler"),
Some(shard_config) => write!(f, "local_compaction_scheduler({shard_config})",),
}) => match (&shard_config, commit_wrapper) {
(None, None) => write!(f, "local_compaction_scheduler_cfg"),
(Some(shard_config), None) => {
write!(f, "local_compaction_scheduler_cfg({shard_config})",)
}
(Some(shard_config), Some(_)) => write!(
f,
"local_compaction_scheduler_cfg({shard_config},commit_wrapper=Some)",
),
(None, Some(_)) => {
write!(f, "local_compaction_scheduler_cfg(commit_wrapper=Some)",)
}
},
}
}
}
/// Job assignment for a given partition.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct CompactionJob {
#[allow(dead_code)]
/// Unique identifier for this job.
@ -54,9 +80,144 @@ impl CompactionJob {
}
}
/// Commit update for a given partition.
#[derive(Debug)]
pub struct CommitUpdate {
/// Partition to be updated.
pub(crate) partition_id: PartitionId,
/// Files to be deleted.
pub(crate) delete: Vec<ParquetFile>,
/// Files to be upgraded.
pub(crate) upgrade: Vec<ParquetFile>,
/// Target level for upgraded files.
pub(crate) target_level: CompactionLevel,
/// Files to be created.
pub(crate) create: Vec<ParquetFileParams>,
}
impl CommitUpdate {
/// Create new commit update.
pub fn new(
partition_id: PartitionId,
delete: Vec<ParquetFile>,
upgrade: Vec<ParquetFile>,
create: Vec<ParquetFileParams>,
target_level: CompactionLevel,
) -> Self {
Self {
partition_id,
delete,
upgrade,
target_level,
create,
}
}
}
/// Status.
#[derive(Debug)]
pub enum CompactionJobStatusVariant {
/// Updates associated with ongoing compaction job.
Update(CommitUpdate),
/// Ongoing compaction job error.
///
/// These errors are not fatal, as some of the compaction job branches may succeed.
Error(ErrorKind),
}
/// Status ([`CompactionJobStatusVariant`]) associated with a [`CompactionJob`].
#[derive(Debug)]
pub struct CompactionJobStatus {
/// Job.
pub job: CompactionJob,
/// Status.
pub status: CompactionJobStatusVariant,
}
/// Response to a [`CompactionJobStatus`].
#[derive(Debug)]
pub enum CompactionJobStatusResponse {
/// Acknowledge receipt of a [`CompactionJobStatusVariant::Error`] request.
Ack,
/// IDs of the created files that were processed.
///
/// This is the response to a [`CompactionJobStatusVariant::Update`] request.
CreatedParquetFiles(Vec<ParquetFileId>),
}
/// Reason for skipping a partition.
#[derive(Debug)]
pub struct SkipReason(pub String);
/// Options for ending a compaction job.
#[derive(Debug)]
pub enum CompactionJobEndVariant {
/// Request to skip partition.
RequestToSkip(SkipReason),
/// Compaction job is complete.
Complete,
}
/// End action ([`CompactionJobEndVariant`]) associated with a [`CompactionJob`].
#[derive(Debug)]
pub struct CompactionJobEnd {
/// Job.
pub job: CompactionJob,
/// End action.
pub end_action: CompactionJobEndVariant,
}
/// Core trait used for all schedulers.
#[async_trait]
pub trait Scheduler: Send + Sync + Debug + Display {
/// Get partitions to be compacted.
async fn get_jobs(&self) -> Vec<CompactionJob>;
/// Compactors call this function to send an update to the scheduler
/// on the status of a compaction job that compactor was assigned.
///
/// Compactor sends a [`CompactionJobStatus`].
/// Scheduler returns a [`CompactionJobStatusResponse`].
///
/// Compactor can send multiple [`CompactionJobStatus`] requests for the same job.
async fn update_job_status(
&self,
job_status: CompactionJobStatus,
) -> Result<CompactionJobStatusResponse, Box<dyn std::error::Error + Send + Sync>>;
/// Compactor sends a [`CompactionJobEnd`], to end a job.
///
/// This method can only be called once per job.
async fn end_job(
&self,
end_action: CompactionJobEnd,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::Commit;
use super::*;
#[test]
fn test_cfg_display_new_local_with_wrapper() {
#[derive(Debug)]
struct MockCommitWrapper;
impl CommitWrapper for MockCommitWrapper {
fn wrap(&self, commit: Arc<dyn Commit>) -> Arc<dyn Commit> {
commit
}
}
let config = SchedulerConfig::new_local_with_wrapper(Arc::new(MockCommitWrapper));
assert_eq!(
config.to_string(),
"local_compaction_scheduler_cfg(commit_wrapper=Some)"
);
}
}

View File

@ -0,0 +1,129 @@
//! Helpers for testing.
//!
//! Usable for any Scheduler API (remote or local).
use std::sync::Arc;
use assert_matches::assert_matches;
use compactor_scheduler::{
CommitUpdate, CompactionJob, CompactionJobEnd, CompactionJobEndVariant, CompactionJobStatus,
CompactionJobStatusResponse, CompactionJobStatusVariant, ErrorKind, Scheduler, SkipReason,
};
use data_types::{CompactionLevel, ParquetFile, ParquetFileParams};
pub async fn can_do_replacement_commit(
scheduler: Arc<dyn Scheduler>,
job: CompactionJob,
deleted_parquet_files: Vec<ParquetFile>,
created_parquet_files: Vec<ParquetFileParams>,
) {
let commit_update = CommitUpdate::new(
job.partition_id,
deleted_parquet_files,
vec![],
created_parquet_files,
CompactionLevel::Final,
);
let res = scheduler
.update_job_status(CompactionJobStatus {
job,
status: CompactionJobStatusVariant::Update(commit_update),
})
.await;
assert_matches!(
res,
Ok(CompactionJobStatusResponse::CreatedParquetFiles(files)) if files.len() == 1,
"expected replacement commit to succeed with exactly one file to be created, got {:?}", res
);
}
pub async fn can_do_upgrade_commit(
scheduler: Arc<dyn Scheduler>,
job: CompactionJob,
existing_parquet_file: ParquetFile,
) {
let commit_update = CommitUpdate::new(
job.partition_id,
vec![],
vec![existing_parquet_file],
vec![],
CompactionLevel::Final,
);
let res = scheduler
.update_job_status(CompactionJobStatus {
job,
status: CompactionJobStatusVariant::Update(commit_update),
})
.await;
assert_matches!(
res,
Ok(CompactionJobStatusResponse::CreatedParquetFiles(files)) if files.is_empty(),
"expected upgrade commit to succeed with no files to be created, got {:?}", res
);
}
pub async fn can_send_error(scheduler: Arc<dyn Scheduler>, job: CompactionJob) {
let res = scheduler
.update_job_status(CompactionJobStatus {
job,
status: CompactionJobStatusVariant::Error(ErrorKind::Unknown(
"error reported (without partition-skip request)".into(),
)),
})
.await;
assert_matches!(
res,
Ok(CompactionJobStatusResponse::Ack),
"expected error to be accepted, got {:?}",
res
);
}
pub async fn can_do_complete(scheduler: Arc<dyn Scheduler>, job: CompactionJob) {
let res = scheduler
.end_job(CompactionJobEnd {
job,
end_action: CompactionJobEndVariant::Complete,
})
.await;
assert_matches!(
res,
Ok(()),
"expected job to be marked as complete, got {:?}",
res
);
}
pub async fn can_do_skip_request(scheduler: Arc<dyn Scheduler>, job: CompactionJob) {
let res = scheduler
.end_job(CompactionJobEnd {
job,
end_action: CompactionJobEndVariant::RequestToSkip(SkipReason(
"error reason given for request the skip".into(),
)),
})
.await;
assert_matches!(
res,
Ok(()),
"expected job to be marked as skipped, got {:?}",
res
);
}
pub async fn assert_all_partitions_leased(scheduler: Arc<dyn Scheduler>) {
let jobs = scheduler.get_jobs().await;
assert_matches!(
jobs[..],
[],
"expected no partition found (all partitions leased), but instead got {:?}",
jobs,
);
}

View File

@ -0,0 +1,213 @@
use std::sync::Arc;
use assert_matches::assert_matches;
use compactor_scheduler::{CompactionJob, CompactionJobEnd, CompactionJobEndVariant, SkipReason};
use data_types::SkippedCompaction;
use super::{super::helpers, TestLocalScheduler};
#[tokio::test]
async fn test_must_end_job_to_make_partition_reavailable() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let mut jobs = scheduler.get_jobs().await;
let (existing_1, _) = test_scheduler.get_seeded_files();
// upgrade commit (to avoid throttling)
helpers::can_do_upgrade_commit(Arc::clone(&scheduler), jobs[0].clone(), existing_1).await;
// lease is still in place (even after commits)
helpers::assert_all_partitions_leased(Arc::clone(&scheduler)).await;
// mark job as complete
helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await;
// TEST: partition is available again
jobs = scheduler.get_jobs().await;
let expected_partition = test_scheduler.get_partition_id();
assert_matches!(
jobs[..],
[CompactionJob { partition_id, .. }] if partition_id == expected_partition,
"expect partition is available again, instead found {:?}", jobs
);
}
#[tokio::test]
async fn test_job_ended_without_commits_should_be_throttled() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let mut jobs = scheduler.get_jobs().await;
// no commit
// lease is still in place
helpers::assert_all_partitions_leased(Arc::clone(&scheduler)).await;
// mark job as complete
helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await;
// TEST: partition is throttled
jobs = scheduler.get_jobs().await;
assert_matches!(
jobs[..],
[],
"expect partition should be throttled, instead found {:?}",
jobs
);
}
#[tokio::test]
async fn test_error_reporting_is_not_sufficient_to_avoid_throttling() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let mut jobs = scheduler.get_jobs().await;
// no commit
// only error reporting
helpers::can_send_error(Arc::clone(&scheduler), jobs[0].clone()).await;
// lease is still in place
helpers::assert_all_partitions_leased(Arc::clone(&scheduler)).await;
// mark job as complete
helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await;
// TEST: partition is throttled
jobs = scheduler.get_jobs().await;
assert_matches!(
jobs[..],
[],
"expect partition should be throttled, instead found {:?}",
jobs
);
}
#[tokio::test]
async fn test_after_complete_job_cannot_end_again() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let jobs = scheduler.get_jobs().await;
// mark job as complete
helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await;
// TEST: end_job complete => will fail, since job is already complete
let res = scheduler
.end_job(CompactionJobEnd {
job: jobs[0].clone(),
end_action: CompactionJobEndVariant::Complete,
})
.await;
assert_matches!(
res,
Err(err) if err.to_string().contains("Unknown or already done partition:"),
"should error if attempt complete after complete, instead found {:?}", res
);
// TEST: end_job skip request => will fail, since job is already complete
let res = scheduler
.end_job(CompactionJobEnd {
job: jobs[0].clone(),
end_action: CompactionJobEndVariant::RequestToSkip(SkipReason(
"skip this partition".to_string(),
)),
})
.await;
assert_matches!(
res,
Err(err) if err.to_string().contains("Unknown or already done partition:"),
"should error if attempt skip partition after complete, instead found {:?}", res
);
}
#[tokio::test]
async fn test_after_skip_request_cannot_end_again() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let jobs = scheduler.get_jobs().await;
// mark partition as skipped (also completes the job)
helpers::can_do_skip_request(Arc::clone(&scheduler), jobs[0].clone()).await;
// TEST: end_job complete => will fail, since job is already complete
let res = scheduler
.end_job(CompactionJobEnd {
job: jobs[0].clone(),
end_action: CompactionJobEndVariant::Complete,
})
.await;
assert_matches!(
res,
Err(err) if err.to_string().contains("Unknown or already done partition:"),
"should error if attempt complete after skip request, instead found {:?}", res
);
// TEST: end_job skip request => will fail, since job is already complete
let res = scheduler
.end_job(CompactionJobEnd {
job: jobs[0].clone(),
end_action: CompactionJobEndVariant::RequestToSkip(SkipReason(
"skip this partition".to_string(),
)),
})
.await;
assert_matches!(
res,
Err(err) if err.to_string().contains("Unknown or already done partition:"),
"should error if attempt skip request after skip request, instead found {:?}", res
);
}
#[tokio::test]
async fn test_what_skip_request_does() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let expected_partition = test_scheduler.get_partition_id();
let scheduler = Arc::clone(&test_scheduler.scheduler);
let catalog = test_scheduler.catalog.catalog();
let mut jobs = scheduler.get_jobs().await;
let (existing_1, _) = test_scheduler.get_seeded_files();
// upgrade commit (to avoid throttling as a confounding variable)
helpers::can_do_upgrade_commit(Arc::clone(&scheduler), jobs[0].clone(), existing_1).await;
helpers::assert_all_partitions_leased(Arc::clone(&scheduler)).await;
// mark partition as skipped (also completes the job)
helpers::can_do_skip_request(Arc::clone(&scheduler), jobs[0].clone()).await;
// TEST: partition still appears in get_jobs()
// todo: plan to update this behavior. Move compactor skip-filtering to scheduler.
jobs = scheduler.get_jobs().await;
assert_matches!(
jobs[..],
[CompactionJob { partition_id, .. }] if partition_id == expected_partition,
"expect partition is still available in get_jobs() (because skip-filtering is compactor-side), instead found {:?}", jobs
);
// but partition in catalog is marked as skipped
// will be consumed in compactor `SkippedCompactionsSource`
let catalog_marked_as_skipped = catalog
.repositories()
.await
.partitions()
.get_in_skipped_compaction(expected_partition)
.await;
assert_matches!(
catalog_marked_as_skipped,
Ok(Some(SkippedCompaction { partition_id, .. })) if partition_id == expected_partition,
"expect partition should be marked as skipped in catalog, instead found {:?}", catalog_marked_as_skipped
);
}

View File

@ -0,0 +1,104 @@
use std::sync::Arc;
use assert_matches::assert_matches;
use compactor_scheduler::{create_test_scheduler, CompactionJob};
use data_types::PartitionId;
use iox_tests::TestCatalog;
use super::{super::helpers, TestLocalScheduler};
#[tokio::test]
async fn test_mocked_partition_ids() {
let catalog = TestCatalog::new();
let partitions = vec![PartitionId::new(0), PartitionId::new(1234242)];
let scheduler = create_test_scheduler(
catalog.catalog(),
Arc::clone(&catalog.time_provider()),
Some(partitions.clone()),
);
let mut result = scheduler
.get_jobs()
.await
.iter()
.map(|j| j.partition_id)
.collect::<Vec<PartitionId>>();
result.sort();
assert_eq!(
result,
partitions,
"expected the partitions provided to create_test_scheduler() should be returned on `get_jobs()`, instead got `{:?}`", result
);
}
#[tokio::test]
async fn test_returns_hot_partition() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let jobs = test_scheduler.scheduler.get_jobs().await;
test_scheduler.assert_matches_seeded_hot_partition(&jobs);
}
#[tokio::test]
async fn test_will_not_fetch_leased_partitions() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let jobs = scheduler.get_jobs().await;
test_scheduler.assert_matches_seeded_hot_partition(&jobs);
helpers::assert_all_partitions_leased(scheduler).await;
}
#[tokio::test]
async fn test_can_refetch_previously_completed_partitions() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let expected_partition = test_scheduler.get_partition_id();
let scheduler = Arc::clone(&test_scheduler.scheduler);
let mut jobs = scheduler.get_jobs().await;
let (existing_1, _) = test_scheduler.get_seeded_files();
// make commit, so not throttled
helpers::can_do_upgrade_commit(Arc::clone(&scheduler), jobs[0].clone(), existing_1).await;
// complete
helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await;
// TEST: can refetch
jobs = scheduler.get_jobs().await;
assert_matches!(
jobs[..],
[CompactionJob { partition_id, .. }] if partition_id == expected_partition,
"expected partition is available after lease is complete, but found {:?}", jobs
);
}
#[tokio::test]
async fn test_will_throttle_partition_if_no_commits() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let mut jobs = scheduler.get_jobs().await;
// no commit
// complete
helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await;
// TEST: is throttled
jobs = scheduler.get_jobs().await;
assert_matches!(
jobs[..],
[],
"expect partition should be throttled, but found {:?}",
jobs
);
}

View File

@ -0,0 +1,84 @@
use std::sync::Arc;
use assert_matches::assert_matches;
use compactor_scheduler::{
create_scheduler, CompactionJob, LocalSchedulerConfig, Scheduler, SchedulerConfig,
};
use data_types::{ColumnType, ParquetFile, ParquetFileParams, PartitionId};
use iox_tests::{ParquetFileBuilder, TestCatalog, TestParquetFileBuilder, TestPartition};
mod end_job;
mod get_jobs;
mod update_job_status;
/// Test local_scheduler, with seeded files, used in these integration tests
#[derive(Debug)]
pub struct TestLocalScheduler {
pub scheduler: Arc<dyn Scheduler>,
pub catalog: Arc<TestCatalog>,
test_partition: Arc<TestPartition>,
seeded_files: (ParquetFile, ParquetFile),
}
impl TestLocalScheduler {
pub async fn builder() -> Self {
// create a catalog with a table with one partition
let catalog = TestCatalog::new();
let ns = catalog.create_namespace_with_retention("ns", None).await;
let table = ns.create_table("table1").await;
table.create_column("time", ColumnType::Time).await;
table.create_column("load", ColumnType::F64).await;
let partition = table.create_partition("k").await;
// file builder
let file_builder = TestParquetFileBuilder::default().with_line_protocol("table1 load=1 11");
// seed with two existing parquet files
let seeded_files = (
partition
.create_parquet_file(file_builder.clone())
.await
.into(),
partition.create_parquet_file(file_builder).await.into(),
);
// local scheduler in default config
// created partitions are "hot" and should be returned with `get_jobs()`
let scheduler = create_scheduler(
SchedulerConfig::Local(LocalSchedulerConfig::default()),
catalog.catalog(),
Arc::clone(&catalog.time_provider()),
Arc::new(metric::Registry::default()),
false,
);
Self {
scheduler,
catalog,
test_partition: partition,
seeded_files,
}
}
pub fn get_seeded_files(&self) -> (ParquetFile, ParquetFile) {
self.seeded_files.clone()
}
pub async fn create_params_for_new_parquet_file(&self) -> ParquetFileParams {
ParquetFileBuilder::new(42)
.with_partition(self.get_partition_id().get())
.build()
.into()
}
pub fn assert_matches_seeded_hot_partition(&self, jobs: &[CompactionJob]) {
assert_matches!(
jobs[..],
[CompactionJob { partition_id, ..}] if partition_id == self.test_partition.partition.id
);
}
/// currently has only 1 partition seeded (by default)
pub fn get_partition_id(&self) -> PartitionId {
self.test_partition.partition.id
}
}

View File

@ -0,0 +1,259 @@
use std::sync::Arc;
use assert_matches::assert_matches;
use compactor_scheduler::{CommitUpdate, CompactionJobStatus, CompactionJobStatusVariant};
use data_types::CompactionLevel;
use super::{super::helpers, TestLocalScheduler};
#[tokio::test]
async fn test_has_two_commit_types() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let jobs = scheduler.get_jobs().await;
let (existing_1, existing_2) = test_scheduler.get_seeded_files();
// upgrade commit
helpers::can_do_upgrade_commit(Arc::clone(&scheduler), jobs[0].clone(), existing_1).await;
// replacement commit
helpers::can_do_replacement_commit(
scheduler,
jobs[0].clone(),
vec![existing_2], // deleted parquet_files
vec![test_scheduler.create_params_for_new_parquet_file().await], // to_create
)
.await;
}
#[tokio::test]
async fn test_no_empty_commits_permitted() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let jobs = scheduler.get_jobs().await;
// empty commit
let res = scheduler
.update_job_status(CompactionJobStatus {
job: jobs[0].clone(),
status: CompactionJobStatusVariant::Update(CommitUpdate::new(
test_scheduler.get_partition_id(),
vec![],
vec![],
vec![],
CompactionLevel::Final, // no args
)),
})
.await;
assert_matches!(
res,
Err(err) if err.to_string().contains("commit must have files to upgrade, and/or a set of files to replace (delete and create)"),
"should reject empty commits (no args provided), instead got {:?}", res
);
}
#[tokio::test]
async fn test_incomplete_replacement_commits_should_error() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let jobs = scheduler.get_jobs().await;
let (existing_1, existing_2) = test_scheduler.get_seeded_files();
// incomplete replacement commit
let res = scheduler
.update_job_status(CompactionJobStatus {
job: jobs[0].clone(),
status: CompactionJobStatusVariant::Update(CommitUpdate::new(
test_scheduler.get_partition_id(),
vec![existing_1.clone()], // to delete
vec![],
vec![], // missing to create
CompactionLevel::Final,
)),
})
.await;
assert_matches!(
res,
Err(err) if err.to_string().contains("replacement commits must have both files to delete and files to create"),
"should reject incomplete replacement commit, instead got {:?}", res
);
// incomplete replacement commit, + complete upgrade commit
let res = scheduler
.update_job_status(CompactionJobStatus {
job: jobs[0].clone(),
status: CompactionJobStatusVariant::Update(CommitUpdate::new(
test_scheduler.get_partition_id(),
vec![existing_1], // to delete
vec![existing_2], // to upgrade
vec![], // missing to create
CompactionLevel::Final,
)),
})
.await;
assert_matches!(
res,
Err(err) if err.to_string().contains("replacement commits must have both files to delete and files to create"),
"should reject incomplete replacement commit (even when an upgrade commit is also provided), but found {:?}", res
);
}
#[tokio::test]
async fn test_has_error_reporting() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let jobs = scheduler.get_jobs().await;
helpers::can_send_error(scheduler, jobs[0].clone()).await;
}
#[tokio::test]
async fn test_can_commit_after_error_reporting() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let jobs = scheduler.get_jobs().await;
let (existing_1, existing_2) = test_scheduler.get_seeded_files();
// error
helpers::can_send_error(Arc::clone(&scheduler), jobs[0].clone()).await;
// TEST: can do commits
helpers::can_do_upgrade_commit(Arc::clone(&scheduler), jobs[0].clone(), existing_1).await;
helpers::can_do_replacement_commit(
scheduler,
jobs[0].clone(),
vec![existing_2], // deleted
vec![test_scheduler.create_params_for_new_parquet_file().await], // to create
)
.await;
}
#[tokio::test]
async fn test_can_report_errors_after_commits() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let jobs = scheduler.get_jobs().await;
let (existing_1, existing_2) = test_scheduler.get_seeded_files();
// do commits
helpers::can_do_upgrade_commit(Arc::clone(&scheduler), jobs[0].clone(), existing_1).await;
helpers::can_do_replacement_commit(
Arc::clone(&scheduler),
jobs[0].clone(),
vec![existing_2], // deleted
vec![test_scheduler.create_params_for_new_parquet_file().await], // to create
)
.await;
// TEST: can still send error
helpers::can_send_error(scheduler, jobs[0].clone()).await;
}
#[tokio::test]
async fn test_cannot_commit_after_complete() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let jobs = scheduler.get_jobs().await;
let (existing_1, _) = test_scheduler.get_seeded_files();
// mark job as complete
helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await;
// TEST: cannot do a commit
let res = scheduler
.update_job_status(CompactionJobStatus {
job: jobs[0].clone(),
status: CompactionJobStatusVariant::Update(CommitUpdate::new(
test_scheduler.get_partition_id(),
vec![],
vec![existing_1],
vec![],
CompactionLevel::Final,
)),
})
.await;
assert_matches!(
res,
Err(err) if err.to_string().contains("Unknown or already done partition:"),
"should reject commit after complete, but found {:?}", res
);
}
#[tokio::test]
async fn test_can_do_more_error_reporting_after_complete() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let jobs = scheduler.get_jobs().await;
// mark job as complete
helpers::can_do_complete(Arc::clone(&scheduler), jobs[0].clone()).await;
// TEST: can do more error reporting
helpers::can_send_error(scheduler, jobs[0].clone()).await;
}
#[tokio::test]
async fn test_cannot_commit_after_skipping() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let jobs = scheduler.get_jobs().await;
let (existing_1, _) = test_scheduler.get_seeded_files();
// mark partition as skipped (also completes the job)
helpers::can_do_skip_request(Arc::clone(&scheduler), jobs[0].clone()).await;
// TEST: cannot do a commit
let res = scheduler
.update_job_status(CompactionJobStatus {
job: jobs[0].clone(),
status: CompactionJobStatusVariant::Update(CommitUpdate::new(
test_scheduler.get_partition_id(),
vec![],
vec![existing_1],
vec![],
CompactionLevel::Final,
)),
})
.await;
assert_matches!(
res,
Err(err) if err.to_string().contains("Unknown or already done partition:"),
"should reject commit after skipping, but found {:?}", res
);
}
#[tokio::test]
async fn test_can_do_more_error_reporting_after_skipping() {
test_helpers::maybe_start_logging();
let test_scheduler = TestLocalScheduler::builder().await;
let scheduler = Arc::clone(&test_scheduler.scheduler);
let jobs = scheduler.get_jobs().await;
// mark partition as skipped (also completes the job)
helpers::can_do_skip_request(Arc::clone(&scheduler), jobs[0].clone()).await;
// TEST: can do more error reporting
helpers::can_send_error(scheduler, jobs[0].clone()).await;
}

View File

@ -0,0 +1,2 @@
mod helpers;
mod local_scheduler;

View File

@ -1,7 +1,7 @@
//! Handles recording commit information to the test run log
use async_trait::async_trait;
use compactor::{Commit, CommitWrapper};
use compactor_scheduler::{Commit, CommitError, CommitWrapper};
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
use std::{
fmt::{Debug, Display},
@ -34,7 +34,7 @@ impl Commit for CommitRecorder {
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId> {
) -> Result<Vec<ParquetFileId>, CommitError> {
if let Some(invariant_check) = self.invariant_check.as_ref() {
invariant_check.check().await
};

View File

@ -127,7 +127,7 @@ impl TestSetupBuilder<false> {
metric_registry: catalog.metric_registry(),
trace_collector,
catalog: catalog.catalog(),
scheduler_config: SchedulerConfig::default(),
scheduler_config: SchedulerConfig::new_local_with_wrapper(Arc::new(commit_wrapper)),
parquet_store_real: catalog.parquet_store.clone(),
parquet_store_scratchpad: ParquetStorage::new(
Arc::new(object_store::memory::InMemory::new()),
@ -150,7 +150,6 @@ impl TestSetupBuilder<false> {
process_once: true,
simulate_without_object_store: false,
parquet_files_sink_override: None,
commit_wrapper: Some(Arc::new(commit_wrapper)),
all_errors_are_fatal: true,
max_num_columns_per_table: 200,
max_num_files_per_plan: 200,

View File

@ -185,7 +185,6 @@ pub async fn create_compactor_server_type(
process_once: compactor_config.process_once,
simulate_without_object_store: false,
parquet_files_sink_override: None,
commit_wrapper: None,
all_errors_are_fatal: false,
max_num_columns_per_table: compactor_config.max_num_columns_per_table,
max_num_files_per_plan: compactor_config.max_num_files_per_plan,

View File

@ -66,6 +66,7 @@ fn convert_shard_config(config: ShardConfigForLocalScheduler) -> Option<ShardCon
pub(crate) fn convert_scheduler_config(config: CompactorSchedulerConfig) -> SchedulerConfig {
match config.compactor_scheduler_type {
CompactorSchedulerType::Local => SchedulerConfig::Local(LocalSchedulerConfig {
commit_wrapper: None,
partitions_source_config: convert_partitions_source_config(
config.partition_source_config,
),

View File

@ -67,7 +67,7 @@ prost-types = { version = "0.11" }
rand = { version = "0.8", features = ["small_rng"] }
rand_core = { version = "0.6", default-features = false, features = ["std"] }
regex = { version = "1" }
regex-automata = { version = "0.3", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-automata = { version = "0.3", default-features = false, features = ["dfa-onepass", "dfa-search", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.7" }
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls", "stream"] }
ring = { version = "0.16", features = ["std"] }
@ -138,7 +138,7 @@ prost-types = { version = "0.11" }
rand = { version = "0.8", features = ["small_rng"] }
rand_core = { version = "0.6", default-features = false, features = ["std"] }
regex = { version = "1" }
regex-automata = { version = "0.3", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-automata = { version = "0.3", default-features = false, features = ["dfa-onepass", "dfa-search", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.7" }
ring = { version = "0.16", features = ["std"] }
rustls = { version = "0.21", default-features = false, features = ["dangerous_configuration", "logging", "tls12"] }