From 04320aced1cb520cd2a739e4f9fc47d5f751cbfa Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 20 Oct 2022 10:45:41 +0000 Subject: [PATCH] refactor: replace `croaring` with `arrow` (#5910) * refactor: replace `croaring` with `roaring` With the read buffer gone, roaring bitmaps are only used to calculate series sets and these calculations are pretty much possible with the pure-Rust version. Also I don't deem that that performance-critical (compared to the roaring bitmaps in the read buffer core). This removes a bunch of dependencies, mostly because `bindgen` is gone. This also removes our "croaring architecture detection" hack. * refactor: replace manual roaring sets with arrow Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .cargo/config | 7 - Cargo.lock | 96 -------------- README.md | 17 +-- iox_query/Cargo.toml | 1 - iox_query/src/exec/seriesset/converter.rs | 155 +++++++--------------- workspace-hack/Cargo.toml | 4 +- 6 files changed, 52 insertions(+), 228 deletions(-) diff --git a/.cargo/config b/.cargo/config index 73c1d0be9b..adecf3dba4 100644 --- a/.cargo/config +++ b/.cargo/config @@ -7,10 +7,3 @@ rustflags = [ # Enable all features supported by CPUs more recent than haswell (2013) "-C", "target-cpu=haswell" ] - -[env] - -# set `ROARING_ARCH` in all processes run by cargo -# workaround dynamic CPU detection bug in croaring -# https://github.com/influxdata/influxdb_iox/pull/2119 -ROARING_ARCH = { value = "haswell", force = true } diff --git a/Cargo.lock b/Cargo.lock index bc0513c448..199fcdf609 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -396,25 +396,6 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" -[[package]] -name = "bindgen" -version = "0.59.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bd2a9a458e8f4304c52c43ebb0cfbd520289f8379a52e329a38afda99bf8eb8" -dependencies = [ - "bitflags", - "cexpr", - "clang-sys", - "lazy_static", - "lazycell", - "peeking_take_while", - "proc-macro2", - "quote", - "regex", - "rustc-hash", - "shlex", -] - [[package]] name = "bitflags" version = "1.3.2" @@ -567,15 +548,6 @@ dependencies = [ "jobserver", ] -[[package]] -name = "cexpr" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" -dependencies = [ - "nom", -] - [[package]] name = "cfg-if" version = "1.0.0" @@ -635,17 +607,6 @@ dependencies = [ "half 1.8.2", ] -[[package]] -name = "clang-sys" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa2e27ae6ab525c3d369ded447057bca5438d86dc3a68f6faafb8269ba82ebf3" -dependencies = [ - "glob", - "libc", - "libloading", -] - [[package]] name = "clap" version = "3.2.22" @@ -999,28 +960,6 @@ dependencies = [ "itertools", ] -[[package]] -name = "croaring" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "665ea5db3d1532cb217d2db42a412bca62a2c6e9f21d5f51c918dcdb319224bf" -dependencies = [ - "byteorder", - "croaring-sys", - "libc", -] - -[[package]] -name = "croaring-sys" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2674a6d59ece772339f5e86ccf6baaf3365effd78bfb7dd4da3b2e2cc0bc7146" -dependencies = [ - "bindgen", - "cc", - "libc", -] - [[package]] name = "crossbeam-channel" version = "0.5.6" @@ -2488,7 +2427,6 @@ dependencies = [ "arrow_util", "async-trait", "chrono", - "croaring", "data_types", "datafusion 0.1.0", "datafusion_util", @@ -2769,12 +2707,6 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" -[[package]] -name = "lazycell" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" - [[package]] name = "lexical-core" version = "0.8.5" @@ -2845,16 +2777,6 @@ version = "0.2.135" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68783febc7782c6c5cb401fbda4de5a9898be1762314da0bb2c10ced61f18b0c" -[[package]] -name = "libloading" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efbc0f03f9a775e9f6aed295c6a1ba2253c5757a9e03d55c6caa46a681abcddd" -dependencies = [ - "cfg-if", - "winapi", -] - [[package]] name = "libm" version = "0.2.5" @@ -3587,12 +3509,6 @@ dependencies = [ "fixedbitset", ] -[[package]] -name = "peeking_take_while" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" - [[package]] name = "percent-encoding" version = "2.2.0" @@ -4372,12 +4288,6 @@ version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" -[[package]] -name = "rustc-hash" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" - [[package]] name = "rustc_version" version = "0.4.0" @@ -4749,12 +4659,6 @@ dependencies = [ "workspace-hack", ] -[[package]] -name = "shlex" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" - [[package]] name = "signal-hook-registry" version = "1.4.0" diff --git a/README.md b/README.md index ef028d77ca..25f1616240 100644 --- a/README.md +++ b/README.md @@ -74,22 +74,9 @@ Follow the instructions for your operating system on the `rustup` site. `rustup` will check the [`rust-toolchain`](./rust-toolchain.toml) file and automatically install and use the correct Rust version for you. -#### Clang +#### C/C++ Compiler -Building InfluxDB IOx requires `clang` (for the [`croaring`] dependency). -Check for `clang` by running `clang --version`. - -```shell -clang --version -Apple clang version 12.0.0 (clang-1200.0.32.27) -Target: x86_64-apple-darwin20.1.0 -Thread model: posix -InstalledDir: /Library/Developer/CommandLineTools/usr/bin -``` - -If `clang` is not already present, it can typically be installed with the system package manager. - -[`croaring`]: https://github.com/saulius/croaring-rs +You need some C/C++ compiler for some non-Rust dependencies like [`zstd`](https://crates.io/crates/zstd). #### lld diff --git a/iox_query/Cargo.toml b/iox_query/Cargo.toml index 1468c68104..783a073dff 100644 --- a/iox_query/Cargo.toml +++ b/iox_query/Cargo.toml @@ -18,7 +18,6 @@ arrow = { version = "25.0.0", features = ["prettyprint"] } arrow_util = { path = "../arrow_util" } async-trait = "0.1" chrono = { version = "0.4", default-features = false } -croaring = "0.6" data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } datafusion_util = { path = "../datafusion_util" } diff --git a/iox_query/src/exec/seriesset/converter.rs b/iox_query/src/exec/seriesset/converter.rs index ca6be3acde..012c28ac0e 100644 --- a/iox_query/src/exec/seriesset/converter.rs +++ b/iox_query/src/exec/seriesset/converter.rs @@ -4,19 +4,16 @@ use arrow::{ self, - array::{Array, DictionaryArray, StringArray}, + array::{Array, ArrayRef, BooleanArray, DictionaryArray, StringArray}, datatypes::{DataType, Int32Type}, record_batch::RecordBatch, }; use datafusion::physical_plan::{common::collect, SendableRecordBatchStream}; -use observability_deps::tracing::trace; use snafu::{OptionExt, ResultExt, Snafu}; use std::sync::Arc; use tokio::sync::mpsc::error::SendError; -use croaring::bitmap::Bitmap; - use crate::exec::{ field::{self, FieldColumns, FieldIndexes}, seriesset::series::Group, @@ -116,49 +113,60 @@ impl SeriesSetConverter { // Algorithm: compute, via bitsets, the rows at which each // tag column changes and thereby where the tagset // changes. Emit a new SeriesSet at each such transition - let mut tag_transitions = tag_indexes + let tag_transitions = tag_indexes .iter() .map(|&col| Self::compute_transitions(&batch, col)) .collect::>(); // no tag columns, emit a single tagset - let intersections = if tag_transitions.is_empty() { - let mut b = Bitmap::create_with_capacity(1); - let end_row = batch.num_rows(); - b.add(end_row as u32); - b + let intersections: Vec = if tag_transitions.is_empty() { + vec![batch.num_rows()] } else { // OR bitsets together to to find all rows where the // keyset (values of the tag keys) changes - let remaining = tag_transitions.split_off(1); - - remaining - .into_iter() - .for_each(|b| tag_transitions[0].or_inplace(&b)); - // take the first item - tag_transitions.into_iter().next().unwrap() + let mut tag_transitions_it = tag_transitions.into_iter(); + let init = tag_transitions_it.next().expect("not empty"); + let intersections = tag_transitions_it.fold(init, |a, b| { + Arc::new( + arrow::compute::or( + a.as_any() + .downcast_ref::() + .expect("boolean array"), + b.as_any() + .downcast_ref::() + .expect("boolean array"), + ) + .expect("or operation"), + ) + }); + let intersections = intersections + .as_any() + .downcast_ref::() + .expect("boolean array"); + intersections + .iter() + .enumerate() + .filter(|(_idx, mask)| mask.unwrap_or(true)) + .map(|(idx, _mask)| idx) + .chain(std::iter::once(batch.num_rows())) + .collect() }; - let mut start_row: u32 = 0; + let mut start_row: usize = 0; // create each series (since bitmap are not Send, we can't // call await during the loop) // emit each series let series_sets = intersections - .iter() + .into_iter() .map(|end_row| { let series_set = SeriesSet { table_name: Arc::clone(&table_name), - tags: Self::get_tag_keys( - &batch, - start_row as usize, - &tag_columns, - &tag_indexes, - ), + tags: Self::get_tag_keys(&batch, start_row, &tag_columns, &tag_indexes), field_indexes: field_indexes.clone(), - start_row: start_row as usize, - num_rows: (end_row - start_row) as usize, + start_row, + num_rows: (end_row - start_row), batch: batch.clone(), }; @@ -176,92 +184,25 @@ impl SeriesSetConverter { /// /// Note: This may return false positives in the presence of dictionaries /// containing duplicates - fn compute_transitions(batch: &RecordBatch, col_idx: usize) -> Bitmap { + fn compute_transitions(batch: &RecordBatch, col_idx: usize) -> ArrayRef { let num_rows = batch.num_rows(); - let mut bitmap = Bitmap::create_with_capacity(num_rows as u32); - if num_rows < 1 { - return bitmap; + if num_rows == 0 { + return Arc::new(BooleanArray::builder(0).finish()); } - // otherwise, scan the column for transitions let col = batch.column(col_idx); - match col.data_type() { - DataType::Utf8 => { - let col = col - .as_any() - .downcast_ref::() - .expect("Casting column"); - let mut current_val = col.value(0); - for row in 1..num_rows { - let next_val = col.value(row); - if next_val != current_val { - bitmap.add(row as u32); - current_val = next_val; - } - } - } - DataType::Dictionary(key, value) - if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 => - { - let col = col - .as_any() - .downcast_ref::>() - .expect("Casting column"); - let keys = col.keys(); - let get_key = |idx| { - if col.is_valid(idx) { - return Some(keys.value(idx)); - } - None - }; - let col_values = col.values(); - let values = col_values - .as_any() - .downcast_ref::() - .expect("Casting values column failed"); - - let mut current_val = get_key(0); - for row in 1..num_rows { - let next_val = get_key(row); - if next_val != current_val { - // - // N.B, concatenating two Arrow dictionary arrays can - // result in duplicate values with differing keys. - // Therefore, when keys differ we should verify they are - // encoding different values. See: - // https://github.com/apache/arrow-rs/pull/15 - // - if let (Some(curr), Some(next)) = (current_val, next_val) { - if values.value(curr as usize) == values.value(next as usize) { - // these logical values are the same even though - // they have different encoded keys. - continue; - } - } - - bitmap.add(row as u32); - current_val = next_val; - } - } - } - _ => unimplemented!( - "Series transition calculations not supported for tag type {:?} in column {:?}", - col.data_type(), - batch.schema().fields()[col_idx] - ), - } - - // for now, always treat the last row as ending a series - bitmap.add(num_rows as u32); - - trace!( - rows = ?bitmap.to_vec(), - ?col_idx, - "row transitions for results" - ); - bitmap + arrow::compute::concat(&[ + &{ + let mut b = BooleanArray::builder(1); + b.append_value(false); + b.finish() + }, + &arrow::compute::neq_dyn(&col.slice(0, col.len() - 1), &col.slice(1, col.len() - 1)) + .expect("cmp"), + ]) + .expect("concat") } /// Creates (column_name, column_value) pairs for each column diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 5725e70b08..14e3afa613 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -160,7 +160,7 @@ getrandom = { version = "0.2", default-features = false, features = ["std"] } once_cell = { version = "1", default-features = false, features = ["unstable"] } scopeguard = { version = "1", features = ["use_std"] } tokio = { version = "1", default-features = false, features = ["winapi"] } -winapi = { version = "0.3", default-features = false, features = ["accctrl", "aclapi", "activation", "basetsd", "combaseapi", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "libloaderapi", "minwinbase", "minwindef", "namedpipeapi", "ntsecapi", "ntstatus", "objbase", "processenv", "roapi", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winstring", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] } +winapi = { version = "0.3", default-features = false, features = ["accctrl", "aclapi", "activation", "basetsd", "combaseapi", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "minwinbase", "minwindef", "namedpipeapi", "ntsecapi", "ntstatus", "objbase", "processenv", "roapi", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winstring", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] } windows-sys = { version = "0.36", features = ["Win32", "Win32_Foundation", "Win32_Networking", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage", "Win32_Storage_FileSystem", "Win32_System", "Win32_System_IO", "Win32_System_LibraryLoader", "Win32_System_Pipes", "Win32_System_SystemServices", "Win32_System_WindowsProgramming"] } [target.x86_64-pc-windows-msvc.build-dependencies] @@ -168,7 +168,7 @@ getrandom = { version = "0.2", default-features = false, features = ["std"] } once_cell = { version = "1", default-features = false, features = ["unstable"] } scopeguard = { version = "1", features = ["use_std"] } tokio = { version = "1", default-features = false, features = ["winapi"] } -winapi = { version = "0.3", default-features = false, features = ["accctrl", "aclapi", "activation", "basetsd", "combaseapi", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "libloaderapi", "minwinbase", "minwindef", "namedpipeapi", "ntsecapi", "ntstatus", "objbase", "processenv", "roapi", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winstring", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] } +winapi = { version = "0.3", default-features = false, features = ["accctrl", "aclapi", "activation", "basetsd", "combaseapi", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "minwinbase", "minwindef", "namedpipeapi", "ntsecapi", "ntstatus", "objbase", "processenv", "roapi", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winstring", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] } windows-sys = { version = "0.36", features = ["Win32", "Win32_Foundation", "Win32_Networking", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage", "Win32_Storage_FileSystem", "Win32_System", "Win32_System_IO", "Win32_System_LibraryLoader", "Win32_System_Pipes", "Win32_System_SystemServices", "Win32_System_WindowsProgramming"] } ### END HAKARI SECTION