Merge branch 'main' into cn/ie2e

pull/24376/head
Dom 2023-02-14 11:12:38 +00:00 committed by GitHub
commit 123c36e5b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 380 additions and 326 deletions

232
Cargo.lock generated
View File

@ -82,9 +82,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299"
[[package]]
name = "anyhow"
version = "1.0.68"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61"
checksum = "224afbd727c3d6e4b90103ece64b8d1b67fbb1973b1046c2281eed3f3803f800"
[[package]]
name = "arrayref"
@ -361,7 +361,7 @@ version = "2.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9834fcc22e0874394a010230586367d4a3e9f11b560f469262678547e1d2575e"
dependencies = [
"bstr 1.1.0",
"bstr",
"doc-comment",
"predicates",
"predicates-core",
@ -473,9 +473,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.6.2"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1304eab461cf02bd70b083ed8273388f9724c549b316ba3d1e213ce0e9e7fb7e"
checksum = "4e246206a63c9830e118d12c894f56a82033da1a2361f5544deeee3df85c99d9"
dependencies = [
"async-trait",
"axum-core",
@ -485,7 +485,7 @@ dependencies = [
"http",
"http-body",
"hyper",
"itoa 1.0.5",
"itoa",
"matchit",
"memchr",
"mime",
@ -502,9 +502,9 @@ dependencies = [
[[package]]
name = "axum-core"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f487e40dc9daee24d8a1779df88522f159a54a980f99cfbe43db0be0bd3444a8"
checksum = "1cae3e661676ffbacb30f1a824089a8c9150e71017f7e1e38f2aa32009188d34"
dependencies = [
"async-trait",
"bytes",
@ -616,21 +616,9 @@ dependencies = [
[[package]]
name = "bstr"
version = "0.2.17"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223"
dependencies = [
"lazy_static",
"memchr",
"regex-automata",
"serde",
]
[[package]]
name = "bstr"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b45ea9b00a7b3f2988e9a65ad3917e62123c38dba709b666506207be96d1790b"
checksum = "b7f0778972c64420fdedc63f09919c8a88bda7b25135357fd25a5d9f3257e832"
dependencies = [
"memchr",
"once_cell",
@ -646,9 +634,9 @@ checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535"
[[package]]
name = "bytemuck"
version = "1.12.3"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aaa3a8d9a1ca92e282c96a32d6511b695d7d994d1d102ba85d279f9b2756947f"
checksum = "c041d3eab048880cb0b86b256447da3f18859a163c3b8d8893f4e6368abe6393"
[[package]]
name = "byteorder"
@ -1166,9 +1154,9 @@ dependencies = [
[[package]]
name = "crc"
version = "3.0.0"
version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53757d12b596c16c78b83458d732a5d1a17ab3f53f2f7412f6fb57cc8a140ab3"
checksum = "86ec7a15cbe22e59248fc7eadb1907dab5ba09372595da4d73dd805ed4417dfe"
dependencies = [
"crc-catalog",
]
@ -1324,13 +1312,12 @@ dependencies = [
[[package]]
name = "csv"
version = "1.1.6"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22813a6dc45b335f9bade10bf7271dc477e81113e89eb251a0bc2a8a81c536e1"
checksum = "af91f40b7355f82b0a891f50e70399475945bb0b0da4f1700ce60761c9d3e359"
dependencies = [
"bstr 0.2.17",
"csv-core",
"itoa 0.4.8",
"itoa",
"ryu",
"serde",
]
@ -1356,9 +1343,9 @@ dependencies = [
[[package]]
name = "cxx"
version = "1.0.86"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d1075c37807dcf850c379432f0df05ba52cc30f279c5cfc43cc221ce7f8579"
checksum = "90d59d9acd2a682b4e40605a242f6670eaa58c5957471cbf85e8aa6a0b97a5e8"
dependencies = [
"cc",
"cxxbridge-flags",
@ -1368,9 +1355,9 @@ dependencies = [
[[package]]
name = "cxx-build"
version = "1.0.86"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5044281f61b27bc598f2f6647d480aed48d2bf52d6eb0b627d84c0361b17aa70"
checksum = "ebfa40bda659dd5c864e65f4c9a2b0aff19bea56b017b9b77c73d3766a453a38"
dependencies = [
"cc",
"codespan-reporting",
@ -1383,15 +1370,15 @@ dependencies = [
[[package]]
name = "cxxbridge-flags"
version = "1.0.86"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61b50bc93ba22c27b0d31128d2d130a0a6b3d267ae27ef7e4fae2167dfe8781c"
checksum = "457ce6757c5c70dc6ecdbda6925b958aae7f959bda7d8fb9bde889e34a09dc03"
[[package]]
name = "cxxbridge-macro"
version = "1.0.86"
version = "1.0.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39e61fda7e62115119469c7b3591fd913ecca96fb766cfd3f2e2502ab7bc87a5"
checksum = "ebf883b7aacd7b2aeb2a7b338648ee19f57c140d4ee8e52c68979c6b2f7f2263"
dependencies = [
"proc-macro2",
"quote",
@ -1408,7 +1395,7 @@ dependencies = [
"hashbrown 0.12.3",
"lock_api",
"once_cell",
"parking_lot_core 0.9.6",
"parking_lot_core 0.9.7",
]
[[package]]
@ -1694,9 +1681,9 @@ checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f"
[[package]]
name = "encoding_rs"
version = "0.8.31"
version = "0.8.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b"
checksum = "071a31f4ee85403370b58aca746f01041ede6f0da2730960ad001edc2b71b394"
dependencies = [
"cfg-if",
]
@ -1755,22 +1742,22 @@ dependencies = [
[[package]]
name = "fastrand"
version = "1.8.0"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a407cfaa3385c4ae6b23e84623d48c2798d06e3e6a1878f7f59f17b3f86499"
checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be"
dependencies = [
"instant",
]
[[package]]
name = "fd-lock"
version = "3.0.8"
version = "3.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb21c69b9fea5e15dbc1049e4b77145dd0ba1c84019c488102de0dc4ea4b0a27"
checksum = "8ef1a30ae415c3a691a4f41afddc2dbcd6d70baf338368d85ebc1e8ed92cedb9"
dependencies = [
"cfg-if",
"rustix",
"windows-sys 0.42.0",
"windows-sys 0.45.0",
]
[[package]]
@ -1859,7 +1846,7 @@ dependencies = [
"futures-core",
"futures-sink",
"pin-project",
"spin 0.9.4",
"spin 0.9.5",
]
[[package]]
@ -2050,9 +2037,9 @@ dependencies = [
[[package]]
name = "gimli"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dec7af912d60cdbd3677c1af9352ebae6fb8394d165568a2234df0fa00f87793"
checksum = "221996f774192f0f718773def8201c4ae31f02616a54ccfc2d358bb0e5cefdec"
[[package]]
name = "glob"
@ -2209,7 +2196,7 @@ dependencies = [
"lazy_static",
"libc",
"pprof 0.10.1",
"spin 0.9.4",
"spin 0.9.5",
"thiserror",
"tikv-jemalloc-sys",
]
@ -2243,9 +2230,9 @@ dependencies = [
[[package]]
name = "hermit-abi"
version = "0.3.0"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "856b5cb0902c2b6d65d5fd97dfa30f9b70c7538e770b98eab5ed52d8db923e01"
checksum = "fed44880c466736ef9a5c5b5facefb5ed0785676d0c02d612db14e54f0d84286"
[[package]]
name = "hex"
@ -2279,7 +2266,7 @@ checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
dependencies = [
"bytes",
"fnv",
"itoa 1.0.5",
"itoa",
]
[[package]]
@ -2332,7 +2319,7 @@ dependencies = [
"http-body",
"httparse",
"httpdate",
"itoa 1.0.5",
"itoa",
"pin-project-lite",
"socket2",
"tokio",
@ -2440,14 +2427,14 @@ dependencies = [
[[package]]
name = "inferno"
version = "0.11.13"
version = "0.11.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7207d75fcf6c1868f1390fc1c610431fe66328e9ee6813330a041ef6879eca1"
checksum = "2fb7c1b80a1dfa604bb4a649a5c5aeef3d913f7c520cb42b40e534e8a61bcdfc"
dependencies = [
"ahash 0.8.3",
"atty",
"indexmap",
"itoa 1.0.5",
"is-terminal",
"itoa",
"log",
"num-format",
"once_cell",
@ -3232,7 +3219,7 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22e18b0a45d56fe973d6db23972bf5bc46f988a4a2385deac9cc29572f09daef"
dependencies = [
"hermit-abi 0.3.0",
"hermit-abi 0.3.1",
"io-lifetimes",
"rustix",
"windows-sys 0.45.0",
@ -3247,12 +3234,6 @@ dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "itoa"
version = "1.0.5"
@ -3270,9 +3251,9 @@ dependencies = [
[[package]]
name = "js-sys"
version = "0.3.60"
version = "0.3.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49409df3e3bf0856b916e2ceaca09ee28e6871cf7d9ce97a692cacfdb2a25a47"
checksum = "445dde2150c55e483f3d8416706b97ec8e8237c307e5b7b4b8dd15e6af2a0730"
dependencies = [
"wasm-bindgen",
]
@ -3741,9 +3722,9 @@ dependencies = [
[[package]]
name = "num-complex"
version = "0.4.2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ae39348c8bc5fbd7f40c727a9925f03517afd2ab27d46702108b6a7e5414c19"
checksum = "02e0d21255c828d6f128a1e41534206671e8c3ea0c62f32291e808dc82cff17d"
dependencies = [
"num-traits",
]
@ -3755,7 +3736,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a652d9771a63711fd3c3deb670acfbe5c30a4072e664d7a3bf5a9e1056ac72c3"
dependencies = [
"arrayvec",
"itoa 1.0.5",
"itoa",
]
[[package]]
@ -3813,9 +3794,9 @@ dependencies = [
[[package]]
name = "object"
version = "0.30.2"
version = "0.30.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b8c786513eb403643f2a88c244c2aaa270ef2153f55094587d0c48a3cf22a83"
checksum = "ea86265d3d3dcb6a27fc51bd29a4bf387fae9d2986b823079d4986af253eb439"
dependencies = [
"memchr",
]
@ -3879,7 +3860,7 @@ version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66"
dependencies = [
"parking_lot_core 0.9.6",
"parking_lot_core 0.9.7",
]
[[package]]
@ -3954,7 +3935,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core 0.9.6",
"parking_lot_core 0.9.7",
]
[[package]]
@ -3973,15 +3954,15 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.6"
version = "0.9.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba1ef8814b5c993410bb3adfad7a5ed269563e4a2f90c41f5d85be7fb47133bf"
checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys 0.42.0",
"windows-sys 0.45.0",
]
[[package]]
@ -4135,9 +4116,9 @@ checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "pest"
version = "2.5.3"
version = "2.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4257b4a04d91f7e9e6290be5d3da4804dd5784fafde3a497d73eb2b4a158c30a"
checksum = "028accff104c4e513bad663bbcd2ad7cfd5304144404c31ed0a77ac103d00660"
dependencies = [
"thiserror",
"ucd-trie",
@ -4145,9 +4126,9 @@ dependencies = [
[[package]]
name = "pest_derive"
version = "2.5.3"
version = "2.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "241cda393b0cdd65e62e07e12454f1f25d57017dcc514b1514cd3c4645e3a0a6"
checksum = "2ac3922aac69a40733080f53c1ce7f91dcf57e1a5f6c52f421fadec7fbdc4b69"
dependencies = [
"pest",
"pest_generator",
@ -4155,9 +4136,9 @@ dependencies = [
[[package]]
name = "pest_generator"
version = "2.5.3"
version = "2.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46b53634d8c8196302953c74d5352f33d0c512a9499bd2ce468fc9f4128fa27c"
checksum = "d06646e185566b5961b4058dd107e0a7f56e77c3f484549fb119867773c0f202"
dependencies = [
"pest",
"pest_meta",
@ -4168,9 +4149,9 @@ dependencies = [
[[package]]
name = "pest_meta"
version = "2.5.3"
version = "2.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ef4f1332a8d4678b41966bb4cc1d0676880e84183a1ecc3f4b69f03e99c7a51"
checksum = "e6f60b2ba541577e2a0c307c8f39d1439108120eb7903adeb6497fa880c59616"
dependencies = [
"once_cell",
"pest",
@ -4179,9 +4160,9 @@ dependencies = [
[[package]]
name = "petgraph"
version = "0.6.2"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143"
checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4"
dependencies = [
"fixedbitset",
"indexmap",
@ -4736,9 +4717,9 @@ dependencies = [
[[package]]
name = "rayon-core"
version = "1.10.1"
version = "1.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cac410af5d00ab6884528b4ab69d1e8e146e8d471201800fa1b4524126de6ad3"
checksum = "356a0625f1954f730c0201cdab48611198dc6ce21f4acff55089b5a78e6e835b"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
@ -4844,9 +4825,9 @@ dependencies = [
[[package]]
name = "rgb"
version = "0.8.34"
version = "0.8.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3603b7d71ca82644f79b5a06d1220e9a58ede60bd32255f698cb1af8838b8db3"
checksum = "7495acf66551cdb696b7711408144bcd3194fc78e32f3a09e809bfe7dd4a7ce3"
dependencies = [
"bytemuck",
]
@ -5113,7 +5094,7 @@ version = "1.0.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76"
dependencies = [
"itoa 1.0.5",
"itoa",
"ryu",
"serde",
]
@ -5134,7 +5115,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa 1.0.5",
"itoa",
"ryu",
"serde",
]
@ -5349,9 +5330,9 @@ dependencies = [
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1"
dependencies = [
"libc",
]
@ -5429,9 +5410,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.4"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f6002a767bff9e83f8eeecf883ecb8011875a21ae8da43bffb817a57e78cc09"
checksum = "7dccf47db1b41fa1573ed27ccf5e08e3ca771cb994f776668c5ebda893b248fc"
dependencies = [
"lock_api",
]
@ -5507,7 +5488,7 @@ dependencies = [
"hkdf",
"hmac",
"indexmap",
"itoa 1.0.5",
"itoa",
"libc",
"libsqlite3-sys",
"log",
@ -5702,9 +5683,9 @@ dependencies = [
[[package]]
name = "sync_wrapper"
version = "0.1.1"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "synchronized-writer"
@ -5821,10 +5802,11 @@ dependencies = [
[[package]]
name = "thread_local"
version = "1.1.4"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180"
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
dependencies = [
"cfg-if",
"once_cell",
]
@ -5912,9 +5894,9 @@ dependencies = [
[[package]]
name = "tinyvec_macros"
version = "0.1.0"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
@ -6357,9 +6339,9 @@ checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94"
[[package]]
name = "unicode-bidi"
version = "0.3.8"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
checksum = "d54675592c1dbefd78cbd98db9bacd89886e1ca50692a0692baefffdeb92dd58"
[[package]]
name = "unicode-ident"
@ -6378,9 +6360,9 @@ dependencies = [
[[package]]
name = "unicode-segmentation"
version = "1.10.0"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fdbf052a0783de01e944a6ce7a8cb939e295b1e7be835a1112c3b9a7f047a5a"
checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36"
[[package]]
name = "unicode-width"
@ -6516,9 +6498,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaf9f5aceeec8be17c128b2e93e031fb8a4d469bb9c4ae2d7dc1888b26887268"
checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
@ -6526,9 +6508,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c8ffb332579b0557b52d268b91feab8df3615f265d5270fec2a8c95b17c1142"
checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9"
dependencies = [
"bumpalo",
"log",
@ -6541,9 +6523,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.33"
version = "0.4.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23639446165ca5a5de86ae1d8896b737ae80319560fbaa4c2887b7da6e7ebd7d"
checksum = "f219e0d211ba40266969f6dbdd90636da12f75bee4fc9d6c23d1260dadb51454"
dependencies = [
"cfg-if",
"js-sys",
@ -6553,9 +6535,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "052be0f94026e6cbc75cdefc9bae13fd6052cdcaf532fa6c45e7ae33a1e6c810"
checksum = "4c21f77c0bedc37fd5dc21f897894a5ca01e7bb159884559461862ae90c0b4c5"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@ -6563,9 +6545,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c"
checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6"
dependencies = [
"proc-macro2",
"quote",
@ -6576,9 +6558,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.83"
version = "0.2.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f"
checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d"
[[package]]
name = "wasm-streams"
@ -6595,9 +6577,9 @@ dependencies = [
[[package]]
name = "web-sys"
version = "0.3.60"
version = "0.3.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f"
checksum = "e33b99f4b23ba3eec1a53ac264e35a755f00e966e0065077d6027c0f575b0b97"
dependencies = [
"js-sys",
"wasm-bindgen",
@ -6624,9 +6606,9 @@ dependencies = [
[[package]]
name = "which"
version = "4.3.0"
version = "4.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c831fbbee9e129a8cf93e7747a82da9d95ba8e16621cae60ec2cdc849bacb7b"
checksum = "2441c784c52b289a054b7201fc93253e288f094e2f4be9058343127c4226a269"
dependencies = [
"either",
"libc",
@ -6821,7 +6803,6 @@ dependencies = [
"rand",
"rand_core",
"regex",
"regex-automata",
"regex-syntax",
"reqwest",
"ring",
@ -6852,6 +6833,7 @@ dependencies = [
"uuid",
"winapi",
"windows-sys 0.42.0",
"windows-sys 0.45.0",
"zstd",
"zstd-safe",
"zstd-sys",

View File

@ -5,7 +5,6 @@ use std::sync::Arc;
use arrow::{
array::{new_null_array, ArrayRef, StringArray},
compute::concat_batches,
datatypes::SchemaRef,
error::ArrowError,
record_batch::RecordBatch,
@ -56,23 +55,3 @@ pub fn ensure_schema(
RecordBatch::try_new(Arc::clone(output_schema), batch_output_columns)
}
/// Merge the record batches into one record batch
/// and pad null values to columns that are not available in certain batches
pub fn merge_record_batches(
output_schema: &SchemaRef,
batches: Vec<Arc<RecordBatch>>,
) -> Result<Option<RecordBatch>, ArrowError> {
// Add null values for non-existing columns
let batches = batches
.iter()
.map(|batch| ensure_schema(output_schema, batch.as_ref()))
.collect::<Result<Vec<_>, _>>()?;
// Combine batches
if batches.is_empty() {
return Ok(None);
}
Ok(Some(concat_batches(output_schema, &batches)?))
}

View File

@ -48,9 +48,7 @@ use super::{
dedicated::DedicatedExecParquetFileSinkWrapper, logging::LoggingParquetFileSinkWrapper,
object_store::ObjectStoreParquetFileSink,
},
parquet_files_sink::{
dispatch::DispatchParquetFilesSink, simulator::ParquetFileSimulator, ParquetFilesSink,
},
parquet_files_sink::{dispatch::DispatchParquetFilesSink, ParquetFilesSink},
partition_done_sink::{
catalog::CatalogPartitionDoneSink, error_kind::ErrorKindPartitionDoneSinkWrapper,
logging::LoggingPartitionDoneSinkWrapper, metrics::MetricsPartitionDoneSinkWrapper,
@ -240,21 +238,22 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
} else {
Arc::new(DedicatedDataFusionPlanExec::new(Arc::clone(&config.exec)))
};
let parquet_files_sink: Arc<dyn ParquetFilesSink> = if config.simulate_without_object_store {
Arc::new(ParquetFileSimulator::new())
} else {
let parquet_file_sink = Arc::new(LoggingParquetFileSinkWrapper::new(
DedicatedExecParquetFileSinkWrapper::new(
ObjectStoreParquetFileSink::new(
config.shard_id,
config.parquet_store_scratchpad.clone(),
Arc::clone(&config.time_provider),
let parquet_files_sink: Arc<dyn ParquetFilesSink> =
if let Some(sink) = config.parquet_files_sink_override.as_ref() {
Arc::clone(sink)
} else {
let parquet_file_sink = Arc::new(LoggingParquetFileSinkWrapper::new(
DedicatedExecParquetFileSinkWrapper::new(
ObjectStoreParquetFileSink::new(
config.shard_id,
config.parquet_store_scratchpad.clone(),
Arc::clone(&config.time_provider),
),
Arc::clone(&config.exec),
),
Arc::clone(&config.exec),
),
));
Arc::new(DispatchParquetFilesSink::new(parquet_file_sink))
};
));
Arc::new(DispatchParquetFilesSink::new(parquet_file_sink))
};
Arc::new(Components {
partition_stream,

View File

@ -15,6 +15,7 @@ pub mod logging;
pub mod mock;
pub mod object_store;
/// Writes streams if data to the object store as one or more parquet files
#[async_trait]
pub trait ParquetFileSink: Debug + Display + Send + Sync {
async fn store(

View File

@ -10,13 +10,16 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use crate::{error::DynError, partition_info::PartitionInfo, plan_ir::PlanIR};
pub mod dispatch;
pub mod simulator;
/// Writes streams, which corresponds to the `plan_ir.files()` to
/// parquet files on object store, returning information about the
/// files that were created.
#[async_trait]
pub trait ParquetFilesSink: Debug + Display + Send + Sync {
/// Writes streams, which corresponds to the `plan_ir.files()` to
/// parquet files on object store, returning information about the
/// files that were created.
/// Writes the streams of RecordBatches, corresponding to the list
/// of files on `plan_ir` to parquet files of the specified
/// `target_level` files on object store, and returns the details
/// needed to create entries in the catalog for those files.
async fn stream_into_file_sink(
&self,
streams: Vec<SendableRecordBatchStream>,

View File

@ -35,6 +35,7 @@ pub fn log_config(config: &Config) {
compact_version,
min_num_l1_files_to_compact,
process_once,
parquet_files_sink_override: parquet_files_sink,
simulate_without_object_store,
all_errors_are_fatal,
} = &config;
@ -48,6 +49,10 @@ pub fn log_config(config: &Config) {
}
};
let parquet_files_sink = parquet_files_sink
.as_ref()
.map(|_| "Some")
.unwrap_or("None");
info!(
shard_id=shard_id.get(),
?metric_registry,
@ -76,6 +81,7 @@ pub fn log_config(config: &Config) {
min_num_l1_files_to_compact,
process_once,
simulate_without_object_store,
%parquet_files_sink,
all_errors_are_fatal,
"config",
);

View File

@ -8,6 +8,8 @@ use iox_query::exec::Executor;
use iox_time::TimeProvider;
use parquet_file::storage::ParquetStorage;
use crate::components::parquet_files_sink::ParquetFilesSink;
/// Config to set up a compactor.
#[derive(Debug, Clone)]
pub struct Config {
@ -118,6 +120,9 @@ pub struct Config {
/// This is useful for testing.
pub simulate_without_object_store: bool,
/// Use the provided [`ParquetFilesSink`] to create parquet files (used for testing)
pub parquet_files_sink_override: Option<Arc<dyn ParquetFilesSink>>,
/// Ensure that ALL errors (including object store errors) result in "skipped" partitions.
///
/// This is mostly useful for testing.

View File

@ -183,11 +183,12 @@ mod plan_ir;
// publically expose items needed for testing
pub use components::{
df_planner::panic::PanicDataFusionPlanner, hardcoded::hardcoded_components,
namespaces_source::mock::NamespaceWrapper, parquet_files_sink::simulator::ParquetFileSimulator,
Components,
namespaces_source::mock::NamespaceWrapper, parquet_files_sink::ParquetFilesSink, Components,
};
pub use driver::compact;
pub use error::DynError;
pub use partition_info::PartitionInfo;
pub use plan_ir::PlanIR;
#[cfg(test)]
mod test_utils;

View File

@ -6,22 +6,30 @@ use data_types::{ChunkOrder, ParquetFile};
/// Describes a specific compactor plan to create.
pub enum PlanIR {
/// Compact `files` into a single large output file
Compact { files: Vec<FileIR> },
/// Compact `files` into multiple files, for each entry in
/// `split_times`. If there are n split entries in split_times,
/// there will be `n+1` output files.
///
/// The contents of each file:
/// * `0`: Rows that have `time` *on or before* the `split_times[0]`
/// * `i (0 < i < split_times's length)`: Rows that have `time` in range `(split_times[i-1], split_times[i]]`
/// * `n (n = split_times.len())`: Rows that have `time` *after* all the `split_times` and NULL rows
Split {
Compact {
/// The files to be compacted
files: Vec<FileIR>,
},
/// Compact `files` into multiple files, for each entry in
/// `split_times`
Split {
/// The files to be compacted.
files: Vec<FileIR>,
/// The timestamps at which to split the data
///
/// If there are n split entries in split_times,
/// there will be `n+1` output files.
///
/// The contents of each file:
/// * `0`: Rows that have `time` *on or before* the `split_times[0]`
/// * `i (0 < i < split_times's length)`: Rows that have `time` in range `(split_times[i-1], split_times[i]]`
/// * `n (n = split_times.len())`: Rows that have `time` *after* all the `split_times` and NULL rows
split_times: Vec<i64>,
},
}
impl PlanIR {
/// Return the number of output files produced
pub fn n_output_files(&self) -> usize {
match self {
Self::Compact { .. } => 1,
@ -29,6 +37,7 @@ impl PlanIR {
}
}
/// return the input files that will be compacted together
pub fn input_files(&self) -> &[FileIR] {
match self {
Self::Compact { files } => files,

View File

@ -5,7 +5,7 @@ use data_types::{CompactionLevel, ParquetFile, PartitionId};
use iox_tests::TestParquetFileBuilder;
use compactor2::config::AlgoVersion;
use compactor2_test_utils::{format_files, list_object_store, TestSetup};
use compactor2_test_utils::{format_files, list_object_store, TestSetup, TestSetupBuilder};
#[tokio::test]
async fn test_compact_no_file() {
@ -532,6 +532,8 @@ async fn assert_skipped_compactions<const N: usize>(
// (TODO move these to a separate module)
// ----------------------------
const ONE_MB: u64 = 1024 * 1024;
/// creates a TestParquetFileBuilder setup for layout tests
fn parquet_builder() -> TestParquetFileBuilder {
TestParquetFileBuilder::default()
@ -540,23 +542,32 @@ fn parquet_builder() -> TestParquetFileBuilder {
.with_line_protocol("table,tag1=A,tag2=B,tag3=C field_int=1i 100")
}
/// runs the scenario and returns a string based output for comparison
async fn run_layout_scenario(setup: &TestSetup, input_files: Vec<ParquetFile>) -> Vec<String> {
setup.catalog.time_provider.inc(Duration::from_nanos(200));
/// Creates the default TestSetupBuilder for layout tests
async fn layout_setup_builder() -> TestSetupBuilder<false> {
// Goal is to keep these as close to the compactor defaults (in
// clap_blocks) as possible so we can predict what the compactor
// will do in production with default settings
TestSetup::builder()
.await
.with_compact_version(AlgoVersion::TargetLevel)
.with_min_num_l1_files_to_compact(10)
.simulate_without_object_store()
}
// record the input files
let mut output = format_files("**** Input Files", &input_files);
/// runs the scenario and returns a string based output for comparison
async fn run_layout_scenario(setup: &TestSetup) -> Vec<String> {
setup.catalog.time_provider.inc(Duration::from_nanos(200));
// run the actual compaction
let compact_result = setup.run_compact().await;
assert_skipped_compactions(setup, []).await;
// record what the compactor actually did
output.extend(compact_result.simulator_runs);
let mut output = compact_result.simulator_runs;
// record the output files
// record the final state of the catalog
let output_files = setup.list_by_table_not_to_delete().await;
output.extend(format_files("**** Output Files", &output_files));
output.extend(format_files("**** Final Output Files ", &output_files));
output
}
@ -564,50 +575,43 @@ async fn run_layout_scenario(setup: &TestSetup, input_files: Vec<ParquetFile>) -
#[tokio::test]
async fn layout_all_overlapping() {
test_helpers::maybe_start_logging();
let one_mb = 1024 * 1024;
let setup = TestSetup::builder()
let setup = layout_setup_builder()
.await
.simulate_without_object_store()
.with_max_desired_file_size_bytes(20 * one_mb)
.with_max_desired_file_size_bytes(20 * ONE_MB)
.build()
.await;
// create virtual files
let mut input_files = vec![];
for _ in 0..10 {
let file = setup
setup
.partition
.create_parquet_file(
parquet_builder()
.with_min_time(100)
.with_max_time(200)
.with_file_size_bytes(one_mb),
.with_file_size_bytes(ONE_MB),
)
.await
.parquet_file;
input_files.push(file);
.await;
}
insta::assert_yaml_snapshot!(
run_layout_scenario(&setup, input_files).await,
run_layout_scenario(&setup).await,
@r###"
---
- "**** Input Files"
- "**** Simulation run 0, type=split(split_times=[180]). Input Files:"
- "L0, all files 1mb "
- "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|"
- "L0.2[100,200] |-------------------------------------L0.2-------------------------------------|"
- "L0.3[100,200] |-------------------------------------L0.3-------------------------------------|"
- "L0.4[100,200] |-------------------------------------L0.4-------------------------------------|"
- "L0.5[100,200] |-------------------------------------L0.5-------------------------------------|"
- "L0.6[100,200] |-------------------------------------L0.6-------------------------------------|"
- "L0.7[100,200] |-------------------------------------L0.7-------------------------------------|"
- "L0.8[100,200] |-------------------------------------L0.8-------------------------------------|"
- "L0.9[100,200] |-------------------------------------L0.9-------------------------------------|"
- "L0.10[100,200] |------------------------------------L0.10-------------------------------------|"
- "**** Simulation Run 0, type=split(split_times=[180])"
- "Input, 10 files: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10"
- "**** Output Files"
- "L0.9[100,200] |-------------------------------------L0.9-------------------------------------|"
- "L0.8[100,200] |-------------------------------------L0.8-------------------------------------|"
- "L0.7[100,200] |-------------------------------------L0.7-------------------------------------|"
- "L0.6[100,200] |-------------------------------------L0.6-------------------------------------|"
- "L0.5[100,200] |-------------------------------------L0.5-------------------------------------|"
- "L0.4[100,200] |-------------------------------------L0.4-------------------------------------|"
- "L0.3[100,200] |-------------------------------------L0.3-------------------------------------|"
- "L0.2[100,200] |-------------------------------------L0.2-------------------------------------|"
- "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|"
- "**** Final Output Files "
- "L1 "
- "L1.11[100,180] 8mb |----------------------------L1.11-----------------------------| "
- "L1.12[180,200] 2mb |----L1.12-----|"
@ -618,13 +622,11 @@ async fn layout_all_overlapping() {
#[tokio::test]
async fn layout_l1_with_new_non_overlapping_l0() {
test_helpers::maybe_start_logging();
let one_hundred_mb = 100 * 1024 * 1024;
let five_kb = 5 * 1024;
let setup = TestSetup::builder()
let setup = layout_setup_builder()
.await
.simulate_without_object_store()
.with_max_desired_file_size_bytes(one_hundred_mb)
.with_max_desired_file_size_bytes(100 * ONE_MB)
.build()
.await;
@ -633,23 +635,20 @@ async fn layout_l1_with_new_non_overlapping_l0() {
//
// L1: 100MB, 100MB, 100MB, 100MB
// L0: 5k, 5k, 5k, 5k, 5k (all non overlapping with the L1 files)
let mut input_files = vec![];
for i in 0..4 {
let file = setup
setup
.partition
.create_parquet_file(
parquet_builder()
.with_min_time(50 + i * 50)
.with_max_time(100 + i * 50)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(one_hundred_mb),
.with_file_size_bytes(100 * ONE_MB),
)
.await
.parquet_file;
input_files.push(file);
.await;
}
for i in 0..5 {
let file = setup
setup
.partition
.create_parquet_file(
parquet_builder()
@ -657,37 +656,35 @@ async fn layout_l1_with_new_non_overlapping_l0() {
.with_max_time(350 + i * 50)
.with_file_size_bytes(five_kb),
)
.await
.parquet_file;
input_files.push(file);
.await;
}
setup.catalog.time_provider.inc(Duration::from_nanos(200));
insta::assert_yaml_snapshot!(
run_layout_scenario(&setup, input_files).await,
run_layout_scenario(&setup).await,
@r###"
---
- "**** Input Files"
- "L0 "
- "L0.5[300,350] 5kb |-L0.5-| "
- "L0.6[350,400] 5kb |-L0.6-| "
- "L0.7[400,450] 5kb |-L0.7-| "
- "L0.8[450,500] 5kb |-L0.8-| "
- "L0.9[500,550] 5kb |-L0.9-|"
- "**** Simulation run 0, type=compact. Input Files:"
- "L0, all files 5kb "
- "L0.9[500,550] |-----L0.9-----|"
- "L0.8[450,500] |-----L0.8-----| "
- "L0.7[400,450] |-----L0.7-----| "
- "L0.6[350,400] |-----L0.6-----| "
- "L0.5[300,350] |-----L0.5-----| "
- "**** Simulation run 1, type=split(split_times=[175, 300, 425]). Input Files:"
- "L1 "
- "L1.1[50,100] 100mb |-L1.1-| "
- "L1.2[100,150] 100mb |-L1.2-| "
- "L1.3[150,200] 100mb |-L1.3-| "
- "L1.4[200,250] 100mb |-L1.4-| "
- "**** Simulation Run 0, type=split(split_times=[175, 300, 425])"
- "Input, 9 files: 1, 2, 3, 4, 5, 6, 7, 8, 9"
- "**** Output Files"
- "L1, all files 100.01mb "
- "L1.10[50,175] |------L1.10-------| "
- "L1.11[175,300] |------L1.11-------| "
- "L1.12[300,425] |------L1.12-------| "
- "L1.13[425,550] |------L1.13-------|"
- "L1.3[150,200] 100mb |-L1.3-| "
- "L1.2[100,150] 100mb |-L1.2-| "
- "L1.1[50,100] 100mb |-L1.1-| "
- "L1.10[300,550] 25kb |----------------L1.10-----------------|"
- "**** Final Output Files "
- "L2, all files 100.01mb "
- "L2.11[50,175] |------L2.11-------| "
- "L2.12[175,300] |------L2.12-------| "
- "L2.13[300,425] |------L2.13-------| "
- "L2.14[425,550] |------L2.14-------|"
"###
);
}
@ -697,10 +694,9 @@ async fn layout_l1_with_new_non_overlapping_l0_larger() {
test_helpers::maybe_start_logging();
let one_mb = 1024 * 1024;
let setup = TestSetup::builder()
let setup = layout_setup_builder()
.await
.simulate_without_object_store()
.with_max_desired_file_size_bytes(100 * one_mb)
.with_max_desired_file_size_bytes(100 * ONE_MB)
.build()
.await;
@ -709,10 +705,9 @@ async fn layout_l1_with_new_non_overlapping_l0_larger() {
//
// L1: 20MB, 50MB, 20MB, 3MB
// L0: 5MB, 5MB, 5MB
let mut input_files = vec![];
for (i, sz) in [20, 50, 20, 3].iter().enumerate() {
let i = i as i64;
let file = setup
setup
.partition
.create_parquet_file(
parquet_builder()
@ -721,12 +716,10 @@ async fn layout_l1_with_new_non_overlapping_l0_larger() {
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(sz * one_mb),
)
.await
.parquet_file;
input_files.push(file);
.await;
}
for i in 0..3 {
let file = setup
setup
.partition
.create_parquet_file(
parquet_builder()
@ -734,33 +727,32 @@ async fn layout_l1_with_new_non_overlapping_l0_larger() {
.with_max_time(350 + i * 50)
.with_file_size_bytes(5 * one_mb),
)
.await
.parquet_file;
input_files.push(file);
.await;
}
setup.catalog.time_provider.inc(Duration::from_nanos(200));
insta::assert_yaml_snapshot!(
run_layout_scenario(&setup, input_files).await,
run_layout_scenario(&setup).await,
@r###"
---
- "**** Input Files"
- "L0 "
- "L0.5[300,350] 5mb |--L0.5--| "
- "L0.6[350,400] 5mb |--L0.6--| "
- "L0.7[400,450] 5mb |--L0.7--|"
- "**** Simulation run 0, type=split(split_times=[420]). Input Files:"
- "L0, all files 5mb "
- "L0.7[400,450] |----------L0.7----------| "
- "L0.6[350,400] |----------L0.6----------| "
- "L0.5[300,350] |----------L0.5----------| "
- "**** Simulation run 1, type=split(split_times=[421]). Input Files:"
- "L1 "
- "L1.1[50,100] 20mb |--L1.1--| "
- "L1.2[100,150] 50mb |--L1.2--| "
- "L1.3[150,200] 20mb |--L1.3--| "
- "L1.4[200,250] 3mb |--L1.4--| "
- "**** Simulation Run 0, type=split(split_times=[421])"
- "Input, 7 files: 1, 2, 3, 4, 5, 6, 7"
- "**** Output Files"
- "L1 "
- "L1.8[50,421] 100.17mb|----------------------------------L1.8----------------------------------| "
- "L1.9[421,450] 7.83mb |L1.9|"
- "L1.3[150,200] 20mb |--L1.3--| "
- "L1.2[100,150] 50mb |--L1.2--| "
- "L1.1[50,100] 20mb |--L1.1--| "
- "L1.9[420,450] 3mb |L1.9|"
- "L1.8[300,420] 12mb |---------L1.8---------| "
- "**** Final Output Files "
- "L2 "
- "L2.10[50,421] 100.17mb|---------------------------------L2.10----------------------------------| "
- "L2.11[421,450] 7.83mb |L2.11|"
"###
);
}

View File

@ -13,8 +13,10 @@
)]
mod display;
mod simulator;
pub use display::{format_files, format_files_split};
use iox_query::exec::ExecutorType;
use simulator::ParquetFileSimulator;
use tracker::AsyncSemaphoreMetrics;
use std::{collections::HashSet, future::Future, num::NonZeroUsize, sync::Arc, time::Duration};
@ -36,7 +38,7 @@ use schema::sort::SortKey;
use compactor2::{
compact,
config::{AlgoVersion, Config, PartitionsSourceConfig},
hardcoded_components, Components, PanicDataFusionPlanner, ParquetFileSimulator, PartitionInfo,
hardcoded_components, Components, PanicDataFusionPlanner, PartitionInfo,
};
// Default values for the test setup builder
@ -112,6 +114,7 @@ impl TestSetupBuilder<false> {
min_num_l1_files_to_compact: MIN_NUM_L1_FILES_TO_COMPACT,
process_once: true,
simulate_without_object_store: false,
parquet_files_sink_override: None,
all_errors_are_fatal: true,
};
@ -305,6 +308,7 @@ impl<const WITH_FILES: bool> TestSetupBuilder<WITH_FILES> {
/// set simulate_without_object_store
pub fn simulate_without_object_store(mut self) -> Self {
self.config.simulate_without_object_store = true;
self.config.parquet_files_sink_override = Some(Arc::new(ParquetFileSimulator::new()));
self
}

View File

@ -1,18 +1,20 @@
use std::sync::{Arc, Mutex};
use std::{
collections::BTreeSet,
sync::{Arc, Mutex},
};
use async_trait::async_trait;
use data_types::{
ColumnSet, CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, SequenceNumber,
ShardId, Timestamp,
ColumnSet, CompactionLevel, ParquetFile, ParquetFileParams, SequenceNumber, ShardId, Timestamp,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use iox_time::Time;
use observability_deps::tracing::{debug, info};
use uuid::Uuid;
use crate::{error::DynError, partition_info::PartitionInfo, plan_ir::PlanIR};
use compactor2::{DynError, ParquetFilesSink, PartitionInfo, PlanIR};
use super::ParquetFilesSink;
use crate::format_files;
/// Simulates the result of running a compaction plan that
/// produces multiple parquet files.
@ -67,18 +69,11 @@ impl ParquetFileSimulator {
runs.into_iter()
.enumerate()
.flat_map(|(i, run)| {
vec![
format!("**** Simulation Run {}, type={}", i, run.plan_type),
format!(
"Input, {} files: {}",
run.input_file_ids.len(),
run.input_file_ids
.into_iter()
.map(|f| f.to_string())
.collect::<Vec<_>>()
.join(", ")
),
]
let title = format!(
"**** Simulation run {}, type={}. Input Files:",
i, run.plan_type
);
format_files(title, &run.input_parquet_files)
})
.collect()
}
@ -120,19 +115,28 @@ impl ParquetFilesSink for ParquetFileSimulator {
.iter()
.map(|f| SimulatedFile::from(&f.file))
.collect();
let input_file_ids: Vec<_> = plan_ir.input_files().iter().map(|f| f.file.id).collect();
let input_parquet_files: Vec<_> = plan_ir
.input_files()
.iter()
.map(|f| f.file.clone())
.collect();
let column_set = overall_column_set(input_parquet_files.iter());
let output_files = even_time_split(&input_files, split_times, target_level);
let partition_info = partition_info.as_ref();
// Compute final output
let output: Vec<_> = output_files
.into_iter()
.map(|f| f.into_parquet_file_params(max_l0_created_at, partition_info.as_ref()))
.map(|f| {
f.into_parquet_file_params(max_l0_created_at, column_set.clone(), partition_info)
})
.collect();
// record what we did
self.runs.lock().unwrap().push(SimulatedRun {
plan_type,
input_file_ids,
input_parquet_files,
});
Ok(output)
@ -180,6 +184,7 @@ impl SimulatedFile {
fn into_parquet_file_params(
self,
max_l0_created_at: Time,
column_set: ColumnSet,
partition_info: &PartitionInfo,
) -> ParquetFileParams {
let Self {
@ -203,7 +208,7 @@ impl SimulatedFile {
row_count,
compaction_level,
created_at: Timestamp::new(1),
column_set: ColumnSet::new(vec![]),
column_set,
max_l0_created_at: max_l0_created_at.into(),
}
}
@ -214,10 +219,18 @@ impl SimulatedFile {
#[derive(Debug, Clone)]
pub struct SimulatedRun {
// fields are used in testing
#[allow(dead_code)]
plan_type: String,
#[allow(dead_code)]
input_file_ids: Vec<ParquetFileId>,
input_parquet_files: Vec<ParquetFile>,
}
fn overall_column_set<'a>(files: impl IntoIterator<Item = &'a ParquetFile>) -> ColumnSet {
let all_columns = files
.into_iter()
.fold(BTreeSet::new(), |mut columns, file| {
columns.extend(file.column_set.iter().cloned());
columns
});
ColumnSet::new(all_columns)
}
/// Calculate simulated output files based on splitting files

View File

@ -3,34 +3,25 @@
//! [keywords]: https://docs.influxdata.com/influxdb/v1.8/query_language/spec/#keywords
use crate::internal::ParseResult;
use nom::branch::alt;
use nom::bytes::complete::{tag, tag_no_case};
use nom::bytes::complete::tag_no_case;
use nom::character::complete::alpha1;
use nom::combinator::{eof, fail, peek, verify};
use nom::combinator::{fail, verify};
use nom::sequence::terminated;
use nom::FindToken;
use once_cell::sync::Lazy;
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
/// Peeks at the input for acceptable characters separating a keyword.
/// Verifies the next character of `i` is valid following a keyword.
///
/// Will return a failure if one of the expected characters is not found.
fn keyword_follow_char(i: &str) -> ParseResult<&str, &str> {
peek(alt((
tag(" "),
tag("\n"),
tag(";"),
tag("("),
tag(")"),
tag("\t"),
tag(","),
tag("="),
tag("!"), // possible !=
tag("/"), // possible comment
tag("-"), // possible comment
eof,
fail, // Return a failure if we reach the end of this alternation
)))(i)
/// Keywords may be followed by whitespace, statement terminator (;), parens,
/// or conditional and arithmetic operators or EOF
fn keyword_follow_char(i: &str) -> ParseResult<&str, ()> {
if i.is_empty() || b" \n\t;(),=!><+-/*|&^%".find_token(i.bytes().next().unwrap()) {
Ok((i, ()))
} else {
fail(i)
}
}
/// Token represents a string with case-insensitive ordering and equality.
@ -162,6 +153,7 @@ pub fn keyword<'a>(keyword: &'static str) -> impl FnMut(&'a str) -> ParseResult<
#[cfg(test)]
mod test {
use super::*;
use crate::assert_error;
use assert_matches::assert_matches;
#[test]
@ -278,13 +270,36 @@ mod test {
// Will fail because keyword `OR` in `ORDER` is not recognized, as is not terminated by a valid character
let err = or_keyword("ORDER").unwrap_err();
assert_matches!(err, nom::Err::Error(crate::internal::Error::Nom(_, kind)) if kind == nom::error::ErrorKind::Fail);
}
// test valid follow-on characters
#[test]
fn test_keyword_followed_by_valid_char() {
let mut tag_keyword = keyword("TAG");
let (rem, got) = tag_keyword("tag!").unwrap();
assert_eq!(rem, "!");
// followed by EOF
let (rem, got) = tag_keyword("tag").unwrap();
assert_eq!(rem, "");
assert_eq!(got, "tag");
//
// Test some of the expected characters
//
let (rem, got) = tag_keyword("tag!=foo").unwrap();
assert_eq!(rem, "!=foo");
assert_eq!(got, "tag");
let (rem, got) = tag_keyword("tag>foo").unwrap();
assert_eq!(rem, ">foo");
assert_eq!(got, "tag");
let (rem, got) = tag_keyword("tag&1 = foo").unwrap();
assert_eq!(rem, "&1 = foo");
assert_eq!(got, "tag");
// Fallible
assert_error!(tag_keyword("tag$"), Fail);
}
#[test]

View File

@ -93,7 +93,10 @@ pub async fn command(config: Config) -> Result<()> {
);
}
// Ensure panics are fatal when running in this server mode.
// Ensure panics (even in threads or tokio tasks) are fatal when
// running in this server mode. This is done to avoid potential
// data corruption because there is no foolproof way to recover
// state after a panic.
make_panics_fatal();
let common_state = CommonServerState::from_config(config.run_config.clone())?;

View File

@ -72,7 +72,10 @@ pub async fn command(config: Config) -> Result<()> {
);
}
// Ensure panics are fatal when running in this server mode.
// Ensure panics (even in threads or tokio tasks) are fatal when
// running in this server mode. This is done to avoid potential
// data corruption because there is no foolproof way to recover
// state after a panic.
make_panics_fatal();
let common_state = CommonServerState::from_config(config.run_config.clone())?;

View File

@ -746,6 +746,20 @@ mod test {
"SELECT SUM(field_f64::float) AS SUM_field_f64, SUM(field_i64::integer) AS SUM_field_i64, SUM(shared_field0::float) AS SUM_shared_field0 FROM temp_01"
);
let stmt = parse_select("SELECT * FROM merge_00, merge_01");
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT col0::float AS col0, col0::tag AS col0_1, col1::float AS col1, col1::tag AS col1_1, col2::string AS col2, col3::string AS col3 FROM merge_00, merge_01"
);
let stmt = parse_select("SELECT /col0/ FROM merge_00, merge_01");
let stmt = rewrite_statement(&namespace, &stmt).unwrap();
assert_eq!(
stmt.to_string(),
"SELECT col0::float AS col0, col0::tag AS col0_1 FROM merge_00, merge_01"
);
// Fallible cases
let stmt = parse_select("SELECT *::field + *::tag FROM cpu");

View File

@ -114,6 +114,29 @@ pub(crate) mod database {
.with_string_field_column_with_stats("shared_field0", None, None)
.with_one_row_of_data(),
),
// Schemas for testing clashing column names when merging across measurements
Arc::new(
TestChunk::new("merge_00")
.with_id(next_chunk_id())
.with_quiet()
.with_time_column()
.with_tag_column("col0")
.with_f64_field_column("col1")
.with_bool_field_column("col2")
.with_string_field_column_with_stats("col3", None, None)
.with_one_row_of_data(),
),
Arc::new(
TestChunk::new("merge_01")
.with_id(next_chunk_id())
.with_quiet()
.with_time_column()
.with_tag_column("col1")
.with_f64_field_column("col0")
.with_bool_field_column("col3")
.with_string_field_column_with_stats("col2", None, None)
.with_one_row_of_data(),
),
]
}
}

View File

@ -208,6 +208,7 @@ pub async fn create_compactor2_server_type(
min_num_l1_files_to_compact: compactor_config.min_num_l1_files_to_compact,
process_once: compactor_config.process_once,
simulate_without_object_store: false,
parquet_files_sink_override: None,
all_errors_are_fatal: false,
});

View File

@ -66,7 +66,6 @@ prost-types = { version = "0.11", features = ["std"] }
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "small_rng", "std", "std_rng"] }
rand_core = { version = "0.6", default-features = false, features = ["alloc", "getrandom", "std"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-automata = { version = "0.1", features = ["regex-syntax", "std"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
reqwest = { version = "0.11", default-features = false, features = ["__rustls", "__tls", "hyper-rustls", "json", "rustls", "rustls-pemfile", "rustls-tls", "rustls-tls-webpki-roots", "serde_json", "stream", "tokio-rustls", "tokio-util", "wasm-streams", "webpki-roots"] }
ring = { version = "0.16", features = ["alloc", "dev_urandom_fallback", "once_cell", "std"] }
@ -177,13 +176,15 @@ once_cell = { version = "1", default-features = false, features = ["unstable"] }
scopeguard = { version = "1", features = ["use_std"] }
tokio = { version = "1", default-features = false, features = ["windows-sys"] }
winapi = { version = "0.3", default-features = false, features = ["activation", "basetsd", "combaseapi", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "minwinbase", "minwindef", "ntsecapi", "ntstatus", "objbase", "processenv", "roapi", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winstring", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] }
windows-sys = { version = "0.42", features = ["Win32", "Win32_Foundation", "Win32_Networking", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage", "Win32_Storage_FileSystem", "Win32_System", "Win32_System_Console", "Win32_System_IO", "Win32_System_LibraryLoader", "Win32_System_Pipes", "Win32_System_SystemServices", "Win32_System_WindowsProgramming", "Win32_UI", "Win32_UI_Input", "Win32_UI_Input_KeyboardAndMouse"] }
windows-sys-b32c9ddb6d93a9d2 = { package = "windows-sys", version = "0.42", features = ["Win32", "Win32_Foundation", "Win32_Networking", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage", "Win32_Storage_FileSystem", "Win32_System", "Win32_System_Console", "Win32_System_IO", "Win32_System_Pipes", "Win32_System_SystemServices", "Win32_System_WindowsProgramming", "Win32_UI", "Win32_UI_Input", "Win32_UI_Input_KeyboardAndMouse"] }
windows-sys-53888c27b7ba5cf4 = { package = "windows-sys", version = "0.45", features = ["Win32", "Win32_Foundation", "Win32_Networking", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage", "Win32_Storage_FileSystem", "Win32_System", "Win32_System_Console", "Win32_System_IO", "Win32_System_LibraryLoader", "Win32_System_SystemServices", "Win32_System_Threading", "Win32_System_WindowsProgramming"] }
[target.x86_64-pc-windows-msvc.build-dependencies]
once_cell = { version = "1", default-features = false, features = ["unstable"] }
scopeguard = { version = "1", features = ["use_std"] }
tokio = { version = "1", default-features = false, features = ["windows-sys"] }
winapi = { version = "0.3", default-features = false, features = ["activation", "basetsd", "combaseapi", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "minwinbase", "minwindef", "ntsecapi", "ntstatus", "objbase", "processenv", "roapi", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winstring", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] }
windows-sys = { version = "0.42", features = ["Win32", "Win32_Foundation", "Win32_Networking", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage", "Win32_Storage_FileSystem", "Win32_System", "Win32_System_Console", "Win32_System_IO", "Win32_System_LibraryLoader", "Win32_System_Pipes", "Win32_System_SystemServices", "Win32_System_WindowsProgramming", "Win32_UI", "Win32_UI_Input", "Win32_UI_Input_KeyboardAndMouse"] }
windows-sys-b32c9ddb6d93a9d2 = { package = "windows-sys", version = "0.42", features = ["Win32", "Win32_Foundation", "Win32_Networking", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage", "Win32_Storage_FileSystem", "Win32_System", "Win32_System_Console", "Win32_System_IO", "Win32_System_Pipes", "Win32_System_SystemServices", "Win32_System_WindowsProgramming", "Win32_UI", "Win32_UI_Input", "Win32_UI_Input_KeyboardAndMouse"] }
windows-sys-53888c27b7ba5cf4 = { package = "windows-sys", version = "0.45", features = ["Win32", "Win32_Foundation", "Win32_Networking", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage", "Win32_Storage_FileSystem", "Win32_System", "Win32_System_Console", "Win32_System_IO", "Win32_System_LibraryLoader", "Win32_System_SystemServices", "Win32_System_Threading", "Win32_System_WindowsProgramming"] }
### END HAKARI SECTION