refactor: replace `croaring` with `arrow` (#5910)
* refactor: replace `croaring` with `roaring` With the read buffer gone, roaring bitmaps are only used to calculate series sets and these calculations are pretty much possible with the pure-Rust version. Also I don't deem that that performance-critical (compared to the roaring bitmaps in the read buffer core). This removes a bunch of dependencies, mostly because `bindgen` is gone. This also removes our "croaring architecture detection" hack. * refactor: replace manual roaring sets with arrow Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
d76d787630
commit
04320aced1
|
|
@ -7,10 +7,3 @@ rustflags = [
|
|||
# Enable all features supported by CPUs more recent than haswell (2013)
|
||||
"-C", "target-cpu=haswell"
|
||||
]
|
||||
|
||||
[env]
|
||||
|
||||
# set `ROARING_ARCH` in all processes run by cargo
|
||||
# workaround dynamic CPU detection bug in croaring
|
||||
# https://github.com/influxdata/influxdb_iox/pull/2119
|
||||
ROARING_ARCH = { value = "haswell", force = true }
|
||||
|
|
|
|||
|
|
@ -396,25 +396,6 @@ version = "0.13.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
|
||||
|
||||
[[package]]
|
||||
name = "bindgen"
|
||||
version = "0.59.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2bd2a9a458e8f4304c52c43ebb0cfbd520289f8379a52e329a38afda99bf8eb8"
|
||||
dependencies = [
|
||||
"bitflags",
|
||||
"cexpr",
|
||||
"clang-sys",
|
||||
"lazy_static",
|
||||
"lazycell",
|
||||
"peeking_take_while",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"regex",
|
||||
"rustc-hash",
|
||||
"shlex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "1.3.2"
|
||||
|
|
@ -567,15 +548,6 @@ dependencies = [
|
|||
"jobserver",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cexpr"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
|
||||
dependencies = [
|
||||
"nom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cfg-if"
|
||||
version = "1.0.0"
|
||||
|
|
@ -635,17 +607,6 @@ dependencies = [
|
|||
"half 1.8.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clang-sys"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fa2e27ae6ab525c3d369ded447057bca5438d86dc3a68f6faafb8269ba82ebf3"
|
||||
dependencies = [
|
||||
"glob",
|
||||
"libc",
|
||||
"libloading",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "clap"
|
||||
version = "3.2.22"
|
||||
|
|
@ -999,28 +960,6 @@ dependencies = [
|
|||
"itertools",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "croaring"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "665ea5db3d1532cb217d2db42a412bca62a2c6e9f21d5f51c918dcdb319224bf"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"croaring-sys",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "croaring-sys"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2674a6d59ece772339f5e86ccf6baaf3365effd78bfb7dd4da3b2e2cc0bc7146"
|
||||
dependencies = [
|
||||
"bindgen",
|
||||
"cc",
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-channel"
|
||||
version = "0.5.6"
|
||||
|
|
@ -2488,7 +2427,6 @@ dependencies = [
|
|||
"arrow_util",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"croaring",
|
||||
"data_types",
|
||||
"datafusion 0.1.0",
|
||||
"datafusion_util",
|
||||
|
|
@ -2769,12 +2707,6 @@ version = "1.4.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
||||
|
||||
[[package]]
|
||||
name = "lazycell"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||
|
||||
[[package]]
|
||||
name = "lexical-core"
|
||||
version = "0.8.5"
|
||||
|
|
@ -2845,16 +2777,6 @@ version = "0.2.135"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "68783febc7782c6c5cb401fbda4de5a9898be1762314da0bb2c10ced61f18b0c"
|
||||
|
||||
[[package]]
|
||||
name = "libloading"
|
||||
version = "0.7.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "efbc0f03f9a775e9f6aed295c6a1ba2253c5757a9e03d55c6caa46a681abcddd"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libm"
|
||||
version = "0.2.5"
|
||||
|
|
@ -3587,12 +3509,6 @@ dependencies = [
|
|||
"fixedbitset",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "peeking_take_while"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
|
||||
|
||||
[[package]]
|
||||
name = "percent-encoding"
|
||||
version = "2.2.0"
|
||||
|
|
@ -4372,12 +4288,6 @@ version = "0.1.21"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
|
||||
|
||||
[[package]]
|
||||
name = "rustc_version"
|
||||
version = "0.4.0"
|
||||
|
|
@ -4749,12 +4659,6 @@ dependencies = [
|
|||
"workspace-hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "shlex"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.4.0"
|
||||
|
|
|
|||
17
README.md
17
README.md
|
|
@ -74,22 +74,9 @@ Follow the instructions for your operating system on the `rustup` site.
|
|||
|
||||
`rustup` will check the [`rust-toolchain`](./rust-toolchain.toml) file and automatically install and use the correct Rust version for you.
|
||||
|
||||
#### Clang
|
||||
#### C/C++ Compiler
|
||||
|
||||
Building InfluxDB IOx requires `clang` (for the [`croaring`] dependency).
|
||||
Check for `clang` by running `clang --version`.
|
||||
|
||||
```shell
|
||||
clang --version
|
||||
Apple clang version 12.0.0 (clang-1200.0.32.27)
|
||||
Target: x86_64-apple-darwin20.1.0
|
||||
Thread model: posix
|
||||
InstalledDir: /Library/Developer/CommandLineTools/usr/bin
|
||||
```
|
||||
|
||||
If `clang` is not already present, it can typically be installed with the system package manager.
|
||||
|
||||
[`croaring`]: https://github.com/saulius/croaring-rs
|
||||
You need some C/C++ compiler for some non-Rust dependencies like [`zstd`](https://crates.io/crates/zstd).
|
||||
|
||||
#### lld
|
||||
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ arrow = { version = "25.0.0", features = ["prettyprint"] }
|
|||
arrow_util = { path = "../arrow_util" }
|
||||
async-trait = "0.1"
|
||||
chrono = { version = "0.4", default-features = false }
|
||||
croaring = "0.6"
|
||||
data_types = { path = "../data_types" }
|
||||
datafusion = { path = "../datafusion" }
|
||||
datafusion_util = { path = "../datafusion_util" }
|
||||
|
|
|
|||
|
|
@ -4,19 +4,16 @@
|
|||
|
||||
use arrow::{
|
||||
self,
|
||||
array::{Array, DictionaryArray, StringArray},
|
||||
array::{Array, ArrayRef, BooleanArray, DictionaryArray, StringArray},
|
||||
datatypes::{DataType, Int32Type},
|
||||
record_batch::RecordBatch,
|
||||
};
|
||||
use datafusion::physical_plan::{common::collect, SendableRecordBatchStream};
|
||||
|
||||
use observability_deps::tracing::trace;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc::error::SendError;
|
||||
|
||||
use croaring::bitmap::Bitmap;
|
||||
|
||||
use crate::exec::{
|
||||
field::{self, FieldColumns, FieldIndexes},
|
||||
seriesset::series::Group,
|
||||
|
|
@ -116,49 +113,60 @@ impl SeriesSetConverter {
|
|||
// Algorithm: compute, via bitsets, the rows at which each
|
||||
// tag column changes and thereby where the tagset
|
||||
// changes. Emit a new SeriesSet at each such transition
|
||||
let mut tag_transitions = tag_indexes
|
||||
let tag_transitions = tag_indexes
|
||||
.iter()
|
||||
.map(|&col| Self::compute_transitions(&batch, col))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// no tag columns, emit a single tagset
|
||||
let intersections = if tag_transitions.is_empty() {
|
||||
let mut b = Bitmap::create_with_capacity(1);
|
||||
let end_row = batch.num_rows();
|
||||
b.add(end_row as u32);
|
||||
b
|
||||
let intersections: Vec<usize> = if tag_transitions.is_empty() {
|
||||
vec![batch.num_rows()]
|
||||
} else {
|
||||
// OR bitsets together to to find all rows where the
|
||||
// keyset (values of the tag keys) changes
|
||||
let remaining = tag_transitions.split_off(1);
|
||||
|
||||
remaining
|
||||
.into_iter()
|
||||
.for_each(|b| tag_transitions[0].or_inplace(&b));
|
||||
// take the first item
|
||||
tag_transitions.into_iter().next().unwrap()
|
||||
let mut tag_transitions_it = tag_transitions.into_iter();
|
||||
let init = tag_transitions_it.next().expect("not empty");
|
||||
let intersections = tag_transitions_it.fold(init, |a, b| {
|
||||
Arc::new(
|
||||
arrow::compute::or(
|
||||
a.as_any()
|
||||
.downcast_ref::<BooleanArray>()
|
||||
.expect("boolean array"),
|
||||
b.as_any()
|
||||
.downcast_ref::<BooleanArray>()
|
||||
.expect("boolean array"),
|
||||
)
|
||||
.expect("or operation"),
|
||||
)
|
||||
});
|
||||
let intersections = intersections
|
||||
.as_any()
|
||||
.downcast_ref::<BooleanArray>()
|
||||
.expect("boolean array");
|
||||
intersections
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter(|(_idx, mask)| mask.unwrap_or(true))
|
||||
.map(|(idx, _mask)| idx)
|
||||
.chain(std::iter::once(batch.num_rows()))
|
||||
.collect()
|
||||
};
|
||||
|
||||
let mut start_row: u32 = 0;
|
||||
let mut start_row: usize = 0;
|
||||
|
||||
// create each series (since bitmap are not Send, we can't
|
||||
// call await during the loop)
|
||||
|
||||
// emit each series
|
||||
let series_sets = intersections
|
||||
.iter()
|
||||
.into_iter()
|
||||
.map(|end_row| {
|
||||
let series_set = SeriesSet {
|
||||
table_name: Arc::clone(&table_name),
|
||||
tags: Self::get_tag_keys(
|
||||
&batch,
|
||||
start_row as usize,
|
||||
&tag_columns,
|
||||
&tag_indexes,
|
||||
),
|
||||
tags: Self::get_tag_keys(&batch, start_row, &tag_columns, &tag_indexes),
|
||||
field_indexes: field_indexes.clone(),
|
||||
start_row: start_row as usize,
|
||||
num_rows: (end_row - start_row) as usize,
|
||||
start_row,
|
||||
num_rows: (end_row - start_row),
|
||||
batch: batch.clone(),
|
||||
};
|
||||
|
||||
|
|
@ -176,92 +184,25 @@ impl SeriesSetConverter {
|
|||
///
|
||||
/// Note: This may return false positives in the presence of dictionaries
|
||||
/// containing duplicates
|
||||
fn compute_transitions(batch: &RecordBatch, col_idx: usize) -> Bitmap {
|
||||
fn compute_transitions(batch: &RecordBatch, col_idx: usize) -> ArrayRef {
|
||||
let num_rows = batch.num_rows();
|
||||
|
||||
let mut bitmap = Bitmap::create_with_capacity(num_rows as u32);
|
||||
if num_rows < 1 {
|
||||
return bitmap;
|
||||
if num_rows == 0 {
|
||||
return Arc::new(BooleanArray::builder(0).finish());
|
||||
}
|
||||
|
||||
// otherwise, scan the column for transitions
|
||||
let col = batch.column(col_idx);
|
||||
match col.data_type() {
|
||||
DataType::Utf8 => {
|
||||
let col = col
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.expect("Casting column");
|
||||
let mut current_val = col.value(0);
|
||||
for row in 1..num_rows {
|
||||
let next_val = col.value(row);
|
||||
if next_val != current_val {
|
||||
bitmap.add(row as u32);
|
||||
current_val = next_val;
|
||||
}
|
||||
}
|
||||
}
|
||||
DataType::Dictionary(key, value)
|
||||
if key.as_ref() == &DataType::Int32 && value.as_ref() == &DataType::Utf8 =>
|
||||
{
|
||||
let col = col
|
||||
.as_any()
|
||||
.downcast_ref::<DictionaryArray<Int32Type>>()
|
||||
.expect("Casting column");
|
||||
let keys = col.keys();
|
||||
let get_key = |idx| {
|
||||
if col.is_valid(idx) {
|
||||
return Some(keys.value(idx));
|
||||
}
|
||||
None
|
||||
};
|
||||
|
||||
let col_values = col.values();
|
||||
let values = col_values
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.expect("Casting values column failed");
|
||||
|
||||
let mut current_val = get_key(0);
|
||||
for row in 1..num_rows {
|
||||
let next_val = get_key(row);
|
||||
if next_val != current_val {
|
||||
//
|
||||
// N.B, concatenating two Arrow dictionary arrays can
|
||||
// result in duplicate values with differing keys.
|
||||
// Therefore, when keys differ we should verify they are
|
||||
// encoding different values. See:
|
||||
// https://github.com/apache/arrow-rs/pull/15
|
||||
//
|
||||
if let (Some(curr), Some(next)) = (current_val, next_val) {
|
||||
if values.value(curr as usize) == values.value(next as usize) {
|
||||
// these logical values are the same even though
|
||||
// they have different encoded keys.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
bitmap.add(row as u32);
|
||||
current_val = next_val;
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => unimplemented!(
|
||||
"Series transition calculations not supported for tag type {:?} in column {:?}",
|
||||
col.data_type(),
|
||||
batch.schema().fields()[col_idx]
|
||||
),
|
||||
}
|
||||
|
||||
// for now, always treat the last row as ending a series
|
||||
bitmap.add(num_rows as u32);
|
||||
|
||||
trace!(
|
||||
rows = ?bitmap.to_vec(),
|
||||
?col_idx,
|
||||
"row transitions for results"
|
||||
);
|
||||
bitmap
|
||||
arrow::compute::concat(&[
|
||||
&{
|
||||
let mut b = BooleanArray::builder(1);
|
||||
b.append_value(false);
|
||||
b.finish()
|
||||
},
|
||||
&arrow::compute::neq_dyn(&col.slice(0, col.len() - 1), &col.slice(1, col.len() - 1))
|
||||
.expect("cmp"),
|
||||
])
|
||||
.expect("concat")
|
||||
}
|
||||
|
||||
/// Creates (column_name, column_value) pairs for each column
|
||||
|
|
|
|||
|
|
@ -160,7 +160,7 @@ getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
|||
once_cell = { version = "1", default-features = false, features = ["unstable"] }
|
||||
scopeguard = { version = "1", features = ["use_std"] }
|
||||
tokio = { version = "1", default-features = false, features = ["winapi"] }
|
||||
winapi = { version = "0.3", default-features = false, features = ["accctrl", "aclapi", "activation", "basetsd", "combaseapi", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "libloaderapi", "minwinbase", "minwindef", "namedpipeapi", "ntsecapi", "ntstatus", "objbase", "processenv", "roapi", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winstring", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] }
|
||||
winapi = { version = "0.3", default-features = false, features = ["accctrl", "aclapi", "activation", "basetsd", "combaseapi", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "minwinbase", "minwindef", "namedpipeapi", "ntsecapi", "ntstatus", "objbase", "processenv", "roapi", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winstring", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] }
|
||||
windows-sys = { version = "0.36", features = ["Win32", "Win32_Foundation", "Win32_Networking", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage", "Win32_Storage_FileSystem", "Win32_System", "Win32_System_IO", "Win32_System_LibraryLoader", "Win32_System_Pipes", "Win32_System_SystemServices", "Win32_System_WindowsProgramming"] }
|
||||
|
||||
[target.x86_64-pc-windows-msvc.build-dependencies]
|
||||
|
|
@ -168,7 +168,7 @@ getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
|||
once_cell = { version = "1", default-features = false, features = ["unstable"] }
|
||||
scopeguard = { version = "1", features = ["use_std"] }
|
||||
tokio = { version = "1", default-features = false, features = ["winapi"] }
|
||||
winapi = { version = "0.3", default-features = false, features = ["accctrl", "aclapi", "activation", "basetsd", "combaseapi", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "libloaderapi", "minwinbase", "minwindef", "namedpipeapi", "ntsecapi", "ntstatus", "objbase", "processenv", "roapi", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winstring", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] }
|
||||
winapi = { version = "0.3", default-features = false, features = ["accctrl", "aclapi", "activation", "basetsd", "combaseapi", "consoleapi", "errhandlingapi", "fileapi", "handleapi", "impl-debug", "impl-default", "knownfolders", "minwinbase", "minwindef", "namedpipeapi", "ntsecapi", "ntstatus", "objbase", "processenv", "roapi", "shellapi", "shlobj", "std", "stringapiset", "synchapi", "timezoneapi", "winbase", "wincon", "winerror", "winnt", "winreg", "winstring", "winuser", "ws2ipdef", "ws2tcpip", "wtypesbase"] }
|
||||
windows-sys = { version = "0.36", features = ["Win32", "Win32_Foundation", "Win32_Networking", "Win32_Networking_WinSock", "Win32_Security", "Win32_Storage", "Win32_Storage_FileSystem", "Win32_System", "Win32_System_IO", "Win32_System_LibraryLoader", "Win32_System_Pipes", "Win32_System_SystemServices", "Win32_System_WindowsProgramming"] }
|
||||
|
||||
### END HAKARI SECTION
|
||||
|
|
|
|||
Loading…
Reference in New Issue